Всем привет!
На Хабре было много статей, о том как писать демоны на PHP и другие fork-нутые вещи. Хочу поделится с вами своими наработками на схожую, но все-таки несколько другую тему — управление несколькими PHP процессами.
Для начала небольшой словарь терминов, используемых в статье.
- Job (работа) — задача, выполняемая в отдельном процессе. Наберите в консоли «php test.php» — вот вам job.
- Job Manager или Process Manager — процесс, управляющий задачами. Собирает и обрабатывает их вывод и может посылать сообщения на ввод.
Цель поставленной задачи в том, чтобы иметь возможность влиять на уже запущенные и работающие процессы и получать информацию о ходе их выполнения.
Для запуска новых процессов я использую функцию proc_open, которая позволяет переопределять дескрипторы ввода/вывода для нового процесса. Для управления отдельным процессом был разработан класс Job. Работа характеризуется названием и выполняемой командой.
class Job {
protected $_pid = 0;
protected $_name;
protected $_cmd = '';
protected $_stderr = '/dev/null';
private $_resource = NULL;
private $_pipes = array();
private $_waitpid = TRUE;
public function __construct($cmd, $name = 'job') {
$this->_cmd = $cmd;
$this->_name = $name;
}
public function __destruct() {
// ожидаем завершения процесса
if ($this->_resource) {
if ($this->_waitpid && $this->isRunning()) {
echo "Waiting for job to complete ";
$status = NULL;
pcntl_waitpid($this->_pid, $status);
/*while ($this->isRunning()) {
echo '.';
sleep(1);
}*/
echo "n";
}
}
// закрываем дескрипторы
if (isset($this->_pipes) && is_array($this->_pipes)) {
foreach (array_keys($this->_pipes) as $index ) {
if (is_resource($this->_pipes[$index])) {
fflush($this->_pipes[$index]);
fclose($this->_pipes[$index]);
unset($this->_pipes[$index]);
}
}
}
// закрываем открытый хэндлер
if ($this->_resource) {
proc_close($this->_resource);
unset($this->_resource);
}
}
public function pid() {
return $this->_pid;
}
public function name() {
return $this->_name;
}
// функция чтения из "трубы". $nohup отвечает за блокирование при чтении
private function readPipe($index, $nohup = FALSE) {
if (!isset($this->_pipes[$index])) return FALSE;
if (!is_resource($this->_pipes[$index]) || feof($this->_pipes[$index])) return FALSE;
if ($nohup) {
$data = '';
while ($line = fgets($this->_pipes[$index])) {
$data .= $line;
}
return $data;
}
while ($data = fgets($this->_pipes[$index])) {
echo $data;
}
}
public function pipeline($nohup = FALSE) {
return $this->readPipe(1, $nohup);
}
public function stderr($nohup = FALSE) {
return $this->readPipe(2, $nohup);
}
// запуск задачи в новом процессе
public function execute() {
// определяем откуда будет читать и куда писать процесс
$descriptorspec = array(
0 => array('pipe', 'r'), // stdin
1 => array('pipe', 'w'), // stdout
2 => array('pipe', 'w') // stderr
);
$this->_resource = proc_open('exec '.$this->_cmd, $descriptorspec, $this->_pipes);
// ставим неблокирующий режим всем дескрипторам
stream_set_blocking($this->_pipes[0], 0);
stream_set_blocking($this->_pipes[1], 0);
stream_set_blocking($this->_pipes[2], 0);
if (!is_resource($this->_resource)) return FALSE;
$proc_status = proc_get_status($this->_resource);
$this->_pid = isset($proc_status['pid']) ? $proc_status['pid'] : 0;
}
public function getPipe() {
return $this->_pipes[1];
}
public function getStderr() {
return $this->_pipes[2];
}
public function isRunning() {
if (!is_resource($this->_resource)) return FALSE;
$proc_status = proc_get_status($this->_resource);
return isset($proc_status['running']) && $proc_status['running'];
}
// посылка сигнала процессу
public function signal($sig) {
if (!$this->isRunning()) return FALSE;
posix_kill($this->_pid, $sig);
}
// отправка сообщения в STDIN процесса
public function message($msg) {
if (!$this->isRunning()) return FALSE;
fwrite($this->_pipes[0], $msg);
}
}
Для управления работами создан класс Job_Manager, который по сути является ключевым во всей схеме.
class Job_Manager {
private $_pool_size = 20;
private $_pool = array();
private $_streams = array();
private $_stderr = array();
private $_is_terminated = FALSE;
protected $_dispatch_function = NULL;
public function __construct() {
// init pool
//
}
public function __destruct() {
// destroy pool
foreach (array_keys($this->_pool) as $index) {
$this->stopJob($index);
}
}
// Проверяем статус запущенных задач
private function checkJobs() {
$running_jobs = 0;
foreach ($this->_pool as $index => $job) {
if (!$job->isRunning()) {
echo "Stopping job ".$this->_pool[$index]->name()." ($index)" . PHP_EOL;
$this->stopJob($index);
} else {
$running_jobs++;
}
}
return $running_jobs;
}
private function getFreeIndex() {
foreach ($this->_pool as $index => $job) {
if (!isset($job)) return $index;
}
return count($this->_pool) < $this->_pool_size ? count($this->_pool) : -1;
}
// Запуск новой задачи
public function startJob($cmd, $name = 'job') {
// broadcast existing jobs
$this->checkJobs();
$free_pool_slots = $this->_pool_size - count($this->_pool);
if ($free_pool_slots <= 0) {
// output error "no free slots in the pool"
return -1;
}
$free_slot_index = $this->getFreeIndex();
if ($free_slot_index < 0) {
return -1;
}
echo "Starting job $name ($free_slot_index)" . PHP_EOL;
$this->_pool[$free_slot_index] = new Job($cmd, $name);
$this->_pool[$free_slot_index]->execute();
$this->_streams[$free_slot_index] = $this->_pool[$free_slot_index]->getPipe();
$this->_stderr[$free_slot_index] = $this->_pool[$free_slot_index]->getStderr();
return $free_slot_index;
}
public function stopJob($index) {
if (!isset($this->_pool[$index]))
return FALSE;
unset($this->_streams[$index]);
unset($this->_stderr[$index]);
unset($this->_pool[$index]);
}
public function name($index) {
if (!isset($this->_pool[$index]))
return FALSE;
return $this->_pool[$index]->name();
}
public function pipeline($index, $nohup = FALSE) {
if (!isset($this->_pool[$index]))
return FALSE;
return $this->_pool[$index]->pipeline($nohup);
}
public function stderr($index, $nohup = FALSE) {
if (!isset($this->_pool[$index]))
return FALSE;
return $this->_pool[$index]->stderr($nohup);
}
private function broadcastMessage($msg) {
// sends selected signal to all child processes
foreach ($this->_pool as $pool_index => $job) {
$job->message($msg);
}
}
private function broadcastSignal($sig) {
// sends selected signal to all child processes
foreach ($this->_pool as $pool_index => $job) {
$job->signal($sig);
}
}
// если была зарегистрирована пользовательская функция разбора - используем ее
protected function dispatch($cmd) {
if (is_callable($this->_dispatch_function)) {
call_user_func($this->_dispatch_function, $cmd);
}
}
// регистрация пользовательской функции для разбора
public function registerDispatch($callable) {
if (is_callable($callable)) {
$this->_dispatch_function = $callable;
} else {
trigger_error("$callable is not callable func", E_USER_WARNING);
}
}
// разбираем пользовательский ввод
private function dispatchMain($cmd) {
$parts = explode(' ', $cmd);
$arg = isset($parts[0]) ? $parts[0] : '';
$val = isset($parts[1]) ? $parts[1] : '';
switch ($arg) {
case "exit":
$this->broadcastSignal(SIGTERM);
$this->_is_terminated = TRUE;
break;
case "test":
echo 'sending test' . PHP_EOL;
$this->broadcastMessage('test');
$this->broadcastSignal(SIGUSR1);
break;
case 'kill':
$pool_index = $val !== '' && (int)$val >= 0 ? (int)$val : -1;
if ($pool_index >= 0 && isset($this->_pool[$pool_index])) {
$this->_pool[$pool_index]->signal(SIGKILL);
}
break;
default:
$this->dispatch($cmd);
break;
}
return FALSE;
}
public function process() {
stream_set_blocking(STDIN, 0);
$write = NULL;
$except = NULL;
while (!$this->_is_terminated) {
/*
из-за особенности функции stream_select приходится особым образом работать с массивами дескрипторов
*/
$read = $this->_streams;
$except = $this->_stderr;
$read[$this->_pool_size] = STDIN;
if (is_array($read) && count($read) > 0) {
if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) {
// oops
} elseif ($num_changed_streams > 0) {
// есть что почитать
if (is_array($read) && count($read) > 0) {
$cmp_array = $this->_streams;
$cmp_array[$this->_pool_size] = STDIN;
foreach ($read as $resource) {
$pool_index = array_search($resource, $cmp_array, TRUE);
if ($pool_index === FALSE) continue;
if ($pool_index == $this->_pool_size) {
// stdin
$content = '';
while ($cmd = fgets(STDIN)) {
if (!$cmd) break;
$content .= $cmd;
}
$content = trim($content);
if ($content) {
// если Process Manager словил на вход какую-то строчку - парсим и решаем что делать
$this->dispatchMain($content);
}
//echo "stdin> " . $cmd;
} else {
// читаем сообщения процессов
$pool_content = $this->pipeline($pool_index, TRUE);
$job_name = $this->name($pool_index);
if ($pool_content) {
echo $job_name ." ($pool_index)" . ': ' . $pool_content;
}
$pool_content = $this->stderr($pool_index, TRUE);
if ($pool_content) {
echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content;
}
}
}
}
}
}
$this->checkJobs();
}
}
}
Управлять некоторыми абстрактными задачами мы уже научились, осталось реализовать класс для самих исполняемых процессов.
class Executable {
protected $_is_terminated = FALSE;
protected $_cleanup_function = NULL;
public function __construct() {
// выставляем обработчик сигналов
pcntl_signal(SIGTERM, array('Executable', 'signalHandler'));
pcntl_signal(SIGHUP, array('Executable', 'signalHandler'));
pcntl_signal(SIGINT, array('Executable', 'signalHandler'));
pcntl_signal(SIGUSR1, array('Executable', 'signalHandler'));
pcntl_signal(SIGUSR2, array('Executable', 'signalHandler'));
stream_set_blocking(STDIN, 0);
stream_set_blocking(STDOUT, 0);
stream_set_blocking(STDERR, 0);
}
public function __destruct() {
//echo "destructor called in " . get_class($this) . PHP_EOL;
if (!$this->_is_terminated) {
$this->_is_terminated = TRUE;
$this->isTerminated();
}
}
// финальные обработчики - если пользователь пожелает
private function cleanup() {
if (is_callable($this->_cleanup_function)) {
call_user_func($this->_cleanup_function);
}
}
protected function registerCleanup($callable) {
if (is_callable($callable)) {
$this->_cleanup_function = $callable;
} else {
trigger_error("$callable is not callable func", E_USER_WARNING);
}
}
protected function isTerminated() {
pcntl_signal_dispatch();
if ($this->_is_terminated) {
$this->cleanup();
}
return $this->_is_terminated;
}
protected function dispatch($cmd) {
// можно смело парсить входные данные
/*
switch ($cmd) {
}
*/
}
protected function checkStdin() {
$read = array(STDIN);
$write = NULL;
$except = NULL;
if (is_array($read) && count($read) > 0) {
if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) {
// oops
} elseif ($num_changed_streams > 0) {
if (is_array($read) && count($read) > 0) {
// stdin
$content = '';
while ($cmd = fgets(STDIN)) {
if (!$cmd) break;
$content .= $cmd;
}
$this->dispatch($content);
echo "recieved $content";
//echo "stdin> " . $cmd;
}
}
}
}
// Обработчик сигналов
protected function signalHandler ($signo) {
switch ($signo) {
case SIGTERM:
case SIGHUP:
case SIGINT:
$this->_is_terminated = TRUE;
//echo "exiting in ".get_class($this)."...n";
break;
case SIGUSR1:
//echo "SIGUSR1 recievedn";
$this->checkStdin();
break;
case SIGUSR2:
$this->_is_terminated = TRUE;
echo "[SHUTDOWN] in " . get_class($this) . PHP_EOL;
flush();
exit(1);
break;
default:
// handle all other signals
break;
}
}
}
В качестве примера использования менеджера процессов реализуем «спящий» процесс — скрипт, который будет спать и отписываться по этому поводу в STDOUT
sleep.php
class SleeperTest extends Executable {
public function sleep() {
for($i = 0; !$this->isTerminated() && $i < 10; $i++) {
ob_start();
echo $i . "n";
ob_end_flush();
sleep(5);
}
}
}
$s = new SleeperTest;
$s->sleep();
pm.php
$pm = new Job_Manager;
$pm->startJob('php sleep.php', 'sleeper1');
$pm->startJob('php sleep.php', 'sleeper2');
//
$pm->process();
Используемые в реализации неблокирующие дескрипторы и функция stream_select позволяют избегать проблемы, типичной для разного рода демонов — высокая загрузка ЦПУ в холостом цикле. Предложенный метод лишен этого недостатка, все работает гладко и спокойно.
Автор: xzag