10 месяцев назад я начал делать браузерную игрушку. Выбор пал на cocos js в качестве графики и websocket в качестве общения с сервером. Технология очень понравилась и я на ней организовал всё общение игры с сервером. Использовал для этого эту статью. Но, к сожалению, тот код, который приведен в той статье, нельзя использовать в продакшене. Как выяснилось, уровень проблемы даже не критический, а блокирующий. Всё настолько плохо, что мне пришлось переписывать всё общение с сервером с вебсокетов на longpooling. В итоге я оставил вариант «если у нас браузер не сафари, то использовать websocket, иначе longpolling» и ещё немного ветвления на эту тему.
Так что опыт использования вебсокет в продакшене накопился приличный. И вот недавно случилось событие, которое сподвигло меня написать первую статью на Хабре.
После того, как игрушка была опубликована в социальной сети, я поправил все найденные критические/блокирующие баги и начал приводить всё в порядок в спокойном режиме. Я хочу обратить внимание на то, что этот вот пример — это вообще единственный в интернете гайд, который содержит серверный код, который можно вставить себе в код и использовать его. Ну вот набрать в поисковике «php websocket server» — попробуйте что-то найти, что можно себе поставить.
Внезапно я перечитываю указанную выше статью и в самом начале обнаруживаю ссылки на «phpdaemon» и «ratchet». Ну думаю, давай в спокойном режиме посмотрю на код тамошний. В PhpDeamon в недрах обработки WebSocket соединения небольшое, но безумно важное ветвление на протоколы WebSocket. И там прямо написано для одного case «Safari5 and many non-browser clients». Сказать, что я офигел — это ничего не сказать. Перед глазами пронеслись несколько сотен часов, тонны нервотрёпки и страдания, которые поставили под вопрос вообще проект. Я не поверил, решил проверить.
В течении ~15 часов я вытянул из PhpDeamon минимальный код, связанный с WebSocket (который работает во всех браузерах последней версии, а сам серверный код может работать под высокой нагрузкой) и его постараюсь опубликовать с объяснениями. Чтобы другие люди не испытали те мучения, через которые мне пришлось пройти. Да, кусок кода получился не маленький, но извините: WebSocket он на клиентской части очень простой, а на стороне сервера всё довольно объёмно (скажем отдельное «спасибо» разработчикам Сафари). Также в связи с тем, что область применения WebSocket — это в первую очередь игры, важен вопрос неблокирующего использования серверного сокета — это бонусная сложность, которая никак здесь не рассматривается, хотя и очень важна.
Тестовое приложение я хотел написать без объектов, чтобы было понятнее. Но, к сожалению, такой подход в данном примере расплодит много повторяющегося кода, поэтому пришлось добавить 1 класс и 3 его наследника. Остальное всё без объектов.
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>WebSocket test page</title>
</head>
<body onload="create();">
<script type="text/javascript">
function create() {
// Example
ws = new WebSocket('ws://'+document.domain+':8081/');
ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';}
ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';}
ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';}
}
</script>
<button onclick="create();">Create WebSocket</button>
<button onclick="ws.send('ping');">Send ping</button>
<button onclick="ws.close();">Close WebSocket</button>
<div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div>
</body>
</html>
В моей игре мне пришлось использовать 3 сокет сервера. Для websocket, для worker`ов и для longpooling. В игре очень много математики, поэтому надо было делать вёркеры и выдавать им задачи на вычисления. Так вот к чему это. Что stream_select для них всех должен быть общий, иначе будут лаги или безумное использование процессора. Это знание тоже было получено взамен кучи истраченных нервов.
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)n");
$sockets = array($master);
stream_set_blocking($master, false); // Относительно этой команды я не уверен, потому что мастер из сокетов читает только новые соединения, и для чтения используется "stream_socket_accept". Вариант, что весь сервис будет подвешен на несколько секунд из-за того, что клиент не торопится соединятся - категорически неприемлемо.
while (true) {
$read = $sockets;
$write = $except = array();
if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
var_dump('stream_select error');
break;
// Сделать выход из цикла, а не "die", потому что в продакшине скорей всего этот код будет выполняться как сервис и при команде "/etc/init.d/game restart" тут 100% будет этот case, так вот надо дать "pcntl" код нормально отработать и не мешать ему.
}
foreach ($read as $socket) {
$index_socket = array_search($socket, $sockets);
if ($index_socket == 0) {
// Новое соединение
continue;
}
// Тут будет обработка сообщений клиентов
}
}
Соединение с новыми клиентами вполне себе стандартный код, но вот из-за того, что сокеты у нас в неблокирующем режиме, нужно написать кучу кода, который по кусочкам соберёт все входящие данные и, когда данных будет достаточно, обработает их, поймёт какой протокол надо использовать и переключится на использование этого протокола. Одна эта задача — уже гора кода, и в PhpDeamon нагородили много кода, который к WebSocket отношения не имеет (они же там 8 разных серверов умеют подымать). Удалось многое отрезать и упростить в этой теме. Оставил только то, что относится к WebSocket.
class ws {
const MAX_BUFFER_SIZE = 1024 * 1024;
protected $socket;
/**
* @var array _SERVER
*/
public $server = [];
protected $headers = [];
protected $closed = false;
protected $unparsed_data = '';
private $current_header;
private $unread_lines = array();
protected $extensions = [];
protected $extensionsCleanRegex = '/(?:^|W)x-webkit-/iS';
/**
* @var integer Current state
*/
protected $state = 0; // stream state of the connection (application protocol level)
/**
* Alias of STATE_STANDBY
*/
const STATE_ROOT = 0;
/**
* Standby state (default state)
*/
const STATE_STANDBY = 0;
/**
* State: first line
*/
const STATE_FIRSTLINE = 1;
/**
* State: headers
*/
const STATE_HEADERS = 2;
/**
* State: content
*/
const STATE_CONTENT = 3;
/**
* State: prehandshake
*/
const STATE_PREHANDSHAKE = 5;
/**
* State: handshaked
*/
const STATE_HANDSHAKED = 6;
public function get_state() {
return $this->state;
}
public function closed() {
return $this->closed;
}
protected function close() {
if ($this->closed) return;
var_dump('self close');
fclose($this->socket);
$this->closed = true;
}
public function __construct($socket) {
stream_set_blocking($socket, false);
$this->socket = $socket;
}
private function read_line() {
$lines = explode(PHP_EOL, $this->unparsed_data);
$last_line = $lines[count($lines)-1];
unset($lines[count($lines) - 1]);
foreach ($lines as $line) {
$this->unread_lines[] = $line;
}
$this->unparsed_data = $last_line;
if (count($this->unread_lines) != 0) {
return array_shift($this->unread_lines);
} else {
return null;
}
}
public function on_receive_data() {
if ($this->closed) return;
$data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE);
if (is_string($data)) {
$this->unparsed_data .= $data;
}
}
/**
* Called when new data received.
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_STANDBY) {
$this->state = self::STATE_FIRSTLINE;
}
if ($this->state === self::STATE_FIRSTLINE) {
if (!$this->http_read_first_line()) {
return;
}
$this->state = self::STATE_HEADERS;
}
if ($this->state === self::STATE_HEADERS) {
if (!$this->http_read_headers()) {
return;
}
if (!$this->http_process_headers()) {
$this->close();
return;
}
$this->state = self::STATE_CONTENT;
}
if ($this->state === self::STATE_CONTENT) {
$this->state = self::STATE_PREHANDSHAKE;
}
}
/**
* Read first line of HTTP request
* @return boolean|null Success
*/
protected function http_read_first_line() {
if (($l = $this->read_line()) === null) {
return null;
}
$e = explode(' ', $l);
$u = isset($e[1]) ? parse_url($e[1]) : false;
if ($u === false) {
$this->bad_request();
return false;
}
if (!isset($u['path'])) {
$u['path'] = null;
}
if (isset($u['host'])) {
$this->server['HTTP_HOST'] = $u['host'];
}
$srv = & $this->server;
$srv['REQUEST_METHOD'] = $e[0];
$srv['REQUEST_TIME'] = time();
$srv['REQUEST_TIME_FLOAT'] = microtime(true);
$srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
$srv['DOCUMENT_URI'] = $u['path'];
$srv['PHP_SELF'] = $u['path'];
$srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
$srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1';
$srv['REMOTE_ADDR'] = null;
$srv['REMOTE_PORT'] = null;
return true;
}
/**
* Read headers line-by-line
* @return boolean|null Success
*/
protected function http_read_headers() {
while (($l = $this->read_line()) !== null) {
if ($l === '') {
return true;
}
$e = explode(': ', $l);
if (isset($e[1])) {
$this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
$this->server[$this->current_header] = $e[1];
} elseif (($e[0][0] === "t" || $e[0][0] === "x20") && $this->current_header) {
// multiline header continued
$this->server[$this->current_header] .= $e[0];
} else {
// whatever client speaks is not HTTP anymore
$this->bad_request();
return false;
}
}
}
/**
* Process headers
* @return bool
*/
protected function http_process_headers() {
$this->state = self::STATE_PREHANDSHAKE;
if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
$str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
$str = preg_replace($this->extensionsCleanRegex, '', $str);
$this->extensions = explode(', ', $str);
}
if (!isset($this->server['HTTP_CONNECTION'])
|| (!preg_match('~(?:^|W)Upgrade(?:W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
|| !isset($this->server['HTTP_UPGRADE'])
|| (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
) {
$this->close();
return false;
}
if (isset($this->server['HTTP_COOKIE'])) {
self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
}
if (isset($this->server['QUERY_STRING'])) {
self::parse_str($this->server['QUERY_STRING'], $this->get);
}
// ----------------------------------------------------------
// Protocol discovery, based on HTTP headers...
// ----------------------------------------------------------
if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
$this->switch_to_protocol('v13');
} elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
$this->switch_to_protocol('v13');
} else {
error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
$this->close();
return false;
}
} elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
$this->switch_to_protocol('ve');
} else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
$this->switch_to_protocol('v0');
}
// ----------------------------------------------------------
// End of protocol discovery
// ----------------------------------------------------------
return true;
}
private function switch_to_protocol($protocol) {
$class = 'ws_'.$protocol;
$this->new_instance = new $class($this->socket);
$this->new_instance->state = $this->state;
$this->new_instance->unparsed_data = $this->unparsed_data;
$this->new_instance->server = $this->server;
}
/**
* Send Bad request
* @return void
*/
public function bad_request() {
$this->write("400 Bad Requestrnrn<html><head><title>400 Bad Request</title></head><body bgcolor="white"><center><h1>400 Bad Request</h1></center></body></html>");
$this->close();
}
/**
* Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
* @param string $s String to parse
* @param array &$var Reference to the resulting array
* @param boolean $header Header-style string
* @return void
*/
public static function parse_str($s, &$var, $header = false)
{
static $cb;
if ($cb === null) {
$cb = function ($m) {
return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
};
}
if ($header) {
$s = strtr($s, self::$hvaltr);
}
if (
(stripos($s, '%u') !== false)
&& preg_match('~(%u[a-fd]{4}|%[c-f][a-fd](?!%[89a-f][a-fd]))~is', $s, $m)
) {
$s = preg_replace_callback('~%(u[a-fd]{4}|[a-fd]{2})~i', $cb, $s);
}
parse_str($s, $var);
}
/**
* Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
* @param string $data Data to send
* @return boolean Success
*/
public function write($data) {
if ($this->closed) return false;
return stream_socket_sendto($this->socket, $data) == 0;
}
}
Смысл этого класса в таком урезанном виде — в конструкторе установить неблокирующий режим для соединения с клиентом. Далее в основном цикле, каждый раз, когда приходят данные — сразу их прочитать и положить (дополнить) в «unparsed_data» переменную (это метод on_receive_data). Важно понимать, что если мы выйдем за размеры MAX_BUFFER_SIZE, то вообще ничего страшного не случится. Можно в итоговом примере, что тут будет, поставить его значение, скажем, «5» и убедится, что всё по-прежнему работает. Просто данные из буфера на первом шаге будут проигнорированы — они ведь неполные будут, и со второго или пятого или сотого захода наберутся, наконец, все принятые данные и будут обработаны. При этом stream_select в основном цикле ждать не будет даже микросекунды, пока все данные не будут извлечены. Константу надо подобрать такую, чтобы 95% ожидаемых данных читались целиком.
Далее в основном цикле (после получения очередной порции данных) мы пробуем накопленные данные обработать (это метод on_read). В классе «ws» метод «on_read» состоит по сути из трёх шагов: «читаем первую строку и готовим переменные окружения», «читаем все заголовки», «обрабатываем все заголовки». Первые 2 пояснять не надо, но написаны они довольно объёмно потому, что мы в неблокирующем режиме и надо быть готовым к тому, что данные оборваны в любом месте. Обработка заголовков сначала проверяет формат запроса правильный или нет, а потом по заголовкам определяет протокол, по которому будет общаться с клиентом. В итоге должны дёрнуть метод switch_to_protocol. Этот метод внутри себя сформирует экземпляр класса «ws_<протокол>» и подготовит его для отдачи в основной цикл.
В основном цикле далее надо собственно проверить: а не надо ли подменить объект (если кто-то может предложить реализацию этого места лучше — всегда пожалуйста).
Далее в основном цикле надо поставить проверку: а не закрыт ли сокет. Если закрыт, то очистить память (об этом дальнее в следующем блоке).
require('ws.php');
require('ws_v0.php');
require('ws_v13.php');
require('ws_ve.php');
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)n");
$sockets = array($master);
/**
* @var ws[] $connections
*/
$connections = array();
stream_set_blocking($master, false);
/**
* @param ws $connection
* @param $data
* @param $type
*/
$my_callback = function($connection, $data, $type) {
var_dump('my ws data: ['.$data.'/'.$type.']');
$connection->send_frame('test '.time());
};
while (true) {
$read = $sockets;
$write = $except = array();
if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
var_dump('stream_select error');
break;
}
foreach ($read as $socket) {
$index_socket = array_search($socket, $sockets);
if ($index_socket == 0) {
// Новое соединение
if ($socket_new = stream_socket_accept($master, -1)) {
$connection = new ws($socket_new, $my_callback);
$sockets[] = $socket_new;
$index_new_socket = array_search($socket_new, $sockets);
$connections[$index_new_socket] = &$connection;
$index_socket = $index_new_socket;
} else {
// Я так и не понял что в этом случае надо делать
error_log('stream_socket_accept');
var_dump('error stream_socket_accept');
continue;
}
}
$connection = &$connections[$index_socket];
$connection->on_receive_data();
$connection->on_read();
if ($connection->get_state() == ws::STATE_PREHANDSHAKE) {
$connection = $connection->get_new_instance();
$connections[$index_socket] = &$connection;
$connection->on_read();
}
if ($connection->closed()) {
unset($sockets[$index_socket]);
unset($connections[$index_socket]);
unset($connection);
var_dump('close '.$index_socket);
}
}
}
Тут добавлен "$my_callback" — это наш custom обработчик сообщений от клиента. Разумеется в продакшине можно завернуть это всё в объекты всякие, а тут чтобы было понятнее просто переменная-функция. О ней чуть позже подробнее.
Реализована обработка нового соединения и реализовано основное тело цикла, о котором я чуть выше писал.
Я хочу обратить внимание на код сервера тут. Что если прочтённые данные из сокета — это пустая строка (да, разумеется я видел там в update проверку на пустую строку), то сокет надо закрыть. Ох, я даже не знаю, сколько этот момет попил мне кровушки и скольких пользователей я потерял. Внезапнейшим образом Сафари отправляет пустую строку и считает это нормой, а этот код берёт и закрывает соединение пользователю. Яндекс-браузер иногда ведёт себя так же. Уж не знаю почему, но в этом случае для Сафари WebSocket остаётся зависшим, то есть он не закрывается, не открывается — просто висит и всё. Вы уже заметили, что я неравнодушен к этому волшебному браузеру? Мне вспоминается, как я верстал под IE6 — примерно такие же ощущения.
Теперь о том, зачем я использую array_search и синхронизирую массив $sockets и массив $connections. Дело в том, что stream_select жизненно необходим чистый массив $sockets и никак иначе. Но как-то надо же связать конкретный сокет из массива $sockets с объектом «ws». Перепробовал кучу вариантов — в итоге остановился на таком варианте, что есть 2 массива, которые постоянно синхронизированы по ключам. В одном массиве неоходимые чистые сокеты для stream_select, а во втором экземпляры класса «ws» или его наследники. Если кто-то может предложить это место лучше — предлагайте.
Ещё отдельно надо отметить случай, когда stream_socket_accept зафэйлился. Я так понимаю, теоретически это может быть только в том случае, если мастер сокет у нас в неблокирующем режиме, и приехало недостаточно данных для соединения клиента. Поэтому просто ничего не делаем.
class ws {
private static $hvaltr = ['; ' => '&', ';' => '&', ' ' => '%20'];
const maxAllowedPacket = 1024 * 1024 * 1024;
const MAX_BUFFER_SIZE = 1024 * 1024;
protected $socket;
/**
* @var array _SERVER
*/
public $server = [];
protected $on_frame_user = null;
protected $handshaked = false;
protected $headers = [];
protected $headers_sent = false;
protected $closed = false;
protected $unparsed_data = '';
private $current_header;
private $unread_lines = array();
/**
* @var ws|null
*/
private $new_instance = null;
protected $extensions = [];
protected $extensionsCleanRegex = '/(?:^|W)x-webkit-/iS';
/**
* @var integer Current state
*/
protected $state = 0; // stream state of the connection (application protocol level)
/**
* Alias of STATE_STANDBY
*/
const STATE_ROOT = 0;
/**
* Standby state (default state)
*/
const STATE_STANDBY = 0;
/**
* State: first line
*/
const STATE_FIRSTLINE = 1;
/**
* State: headers
*/
const STATE_HEADERS = 2;
/**
* State: content
*/
const STATE_CONTENT = 3;
/**
* State: prehandshake
*/
const STATE_PREHANDSHAKE = 5;
/**
* State: handshaked
*/
const STATE_HANDSHAKED = 6;
public function get_state() {
return $this->state;
}
public function get_new_instance() {
return $this->new_instance;
}
public function closed() {
return $this->closed;
}
protected function close() {
if ($this->closed) return;
var_dump('self close');
fclose($this->socket);
$this->closed = true;
}
public function __construct($socket, $on_frame_user = null) {
stream_set_blocking($socket, false);
$this->socket = $socket;
$this->on_frame_user = $on_frame_user;
}
private function read_line() {
$lines = explode(PHP_EOL, $this->unparsed_data);
$last_line = $lines[count($lines)-1];
unset($lines[count($lines) - 1]);
foreach ($lines as $line) {
$this->unread_lines[] = $line;
}
$this->unparsed_data = $last_line;
if (count($this->unread_lines) != 0) {
return array_shift($this->unread_lines);
} else {
return null;
}
}
public function on_receive_data() {
if ($this->closed) return;
$data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE);
if (is_string($data)) {
$this->unparsed_data .= $data;
}
}
/**
* Called when new data received.
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_STANDBY) {
$this->state = self::STATE_FIRSTLINE;
}
if ($this->state === self::STATE_FIRSTLINE) {
if (!$this->http_read_first_line()) {
return;
}
$this->state = self::STATE_HEADERS;
}
if ($this->state === self::STATE_HEADERS) {
if (!$this->http_read_headers()) {
return;
}
if (!$this->http_process_headers()) {
$this->close();
return;
}
$this->state = self::STATE_CONTENT;
}
if ($this->state === self::STATE_CONTENT) {
$this->state = self::STATE_PREHANDSHAKE;
}
}
/**
* Read first line of HTTP request
* @return boolean|null Success
*/
protected function http_read_first_line() {
if (($l = $this->read_line()) === null) {
return null;
}
$e = explode(' ', $l);
$u = isset($e[1]) ? parse_url($e[1]) : false;
if ($u === false) {
$this->bad_request();
return false;
}
if (!isset($u['path'])) {
$u['path'] = null;
}
if (isset($u['host'])) {
$this->server['HTTP_HOST'] = $u['host'];
}
$address = explode(':', stream_socket_get_name($this->socket, true)); //получаем адрес клиента
$srv = & $this->server;
$srv['REQUEST_METHOD'] = $e[0];
$srv['REQUEST_TIME'] = time();
$srv['REQUEST_TIME_FLOAT'] = microtime(true);
$srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
$srv['DOCUMENT_URI'] = $u['path'];
$srv['PHP_SELF'] = $u['path'];
$srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
$srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1';
$srv['REMOTE_ADDR'] = $address[0];
$srv['REMOTE_PORT'] = $address[1];
return true;
}
/**
* Read headers line-by-line
* @return boolean|null Success
*/
protected function http_read_headers() {
while (($l = $this->read_line()) !== null) {
if ($l === '') {
return true;
}
$e = explode(': ', $l);
if (isset($e[1])) {
$this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
$this->server[$this->current_header] = $e[1];
} elseif (($e[0][0] === "t" || $e[0][0] === "x20") && $this->current_header) {
// multiline header continued
$this->server[$this->current_header] .= $e[0];
} else {
// whatever client speaks is not HTTP anymore
$this->bad_request();
return false;
}
}
}
/**
* Process headers
* @return bool
*/
protected function http_process_headers() {
$this->state = self::STATE_PREHANDSHAKE;
if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
$str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
$str = preg_replace($this->extensionsCleanRegex, '', $str);
$this->extensions = explode(', ', $str);
}
if (!isset($this->server['HTTP_CONNECTION'])
|| (!preg_match('~(?:^|W)Upgrade(?:W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
|| !isset($this->server['HTTP_UPGRADE'])
|| (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
) {
$this->close();
return false;
}
/*
if (isset($this->server['HTTP_COOKIE'])) {
self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
}
if (isset($this->server['QUERY_STRING'])) {
self::parse_str($this->server['QUERY_STRING'], $this->get);
}
*/
// ----------------------------------------------------------
// Protocol discovery, based on HTTP headers...
// ----------------------------------------------------------
if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
$this->switch_to_protocol('v13');
} elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
$this->switch_to_protocol('v13');
} else {
error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
$this->close();
return false;
}
} elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
$this->switch_to_protocol('ve');
} else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
$this->switch_to_protocol('v0');
}
// ----------------------------------------------------------
// End of protocol discovery
// ----------------------------------------------------------
return true;
}
private function switch_to_protocol($protocol) {
$class = 'ws_'.$protocol;
$this->new_instance = new $class($this->socket);
$this->new_instance->state = $this->state;
$this->new_instance->unparsed_data = $this->unparsed_data;
$this->new_instance->server = $this->server;
$this->new_instance->on_frame_user = $this->on_frame_user;
}
/**
* Send Bad request
* @return void
*/
public function bad_request() {
$this->write("400 Bad Requestrnrn<html><head><title>400 Bad Request</title></head><body bgcolor="white"><center><h1>400 Bad Request</h1></center></body></html>");
$this->close();
}
/**
* Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
* @param string $s String to parse
* @param array &$var Reference to the resulting array
* @param boolean $header Header-style string
* @return void
*/
public static function parse_str($s, &$var, $header = false) {
static $cb;
if ($cb === null) {
$cb = function ($m) {
return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
};
}
if ($header) {
$s = strtr($s, self::$hvaltr);
}
if (
(stripos($s, '%u') !== false)
&& preg_match('~(%u[a-fd]{4}|%[c-f][a-fd](?!%[89a-f][a-fd]))~is', $s, $m)
) {
$s = preg_replace_callback('~%(u[a-fd]{4}|[a-fd]{2})~i', $cb, $s);
}
parse_str($s, $var);
}
/**
* Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
* @param string $data Data to send
* @return boolean Success
*/
public function write($data) {
if ($this->closed) return false;
return stream_socket_sendto($this->socket, $data) == 0;
}
/**
* Будте любезны в отнаследованном классе реализовать этот метод
* @return bool
*/
protected function send_handshake_reply() {
return false;
}
/**
* Called when we're going to handshake.
* @return boolean Handshake status
*/
public function handshake() {
$extra_headers = '';
foreach ($this->headers as $k => $line) {
if ($k !== 'STATUS') {
$extra_headers .= $line . "rn";
}
}
if (!$this->send_handshake_reply($extra_headers)) {
error_log(get_class($this) . '::' . __METHOD__ . ' : Handshake protocol failure for client ""'); // $this->addr
$this->close();
return false;
}
$this->handshaked = true;
$this->headers_sent = true;
$this->state = static::STATE_HANDSHAKED;
return true;
}
/**
* Read from buffer without draining
* @param integer $n Number of bytes to read
* @param integer $o Offset
* @return string|false
*/
public function look($n, $o = 0) {
if (strlen($this->unparsed_data) <= $o) {
return '';
}
return substr($this->unparsed_data, $o, $n);
}
/**
* Convert bytes into integer
* @param string $str Bytes
* @param boolean $l Little endian? Default is false
* @return integer
*/
public static function bytes2int($str, $l = false) {
if ($l) {
$str = strrev($str);
}
$dec = 0;
$len = strlen($str);
for ($i = 0; $i < $len; ++$i) {
$dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1);
}
return $dec;
}
/**
* Drains buffer
* @param integer $n Numbers of bytes to drain
* @return boolean Success
*/
public function drain($n) {
$ret = substr($this->unparsed_data, 0, $n);
$this->unparsed_data = substr($this->unparsed_data, $n);
return $ret;
}
/**
* Read data from the connection's buffer
* @param integer $n Max. number of bytes to read
* @return string|false Readed data
*/
public function read($n) {
if ($n <= 0) {
return '';
}
$read = $this->drain($n);
if ($read === '') {
return false;
}
return $read;
}
/**
* Reads all data from the connection's buffer
* @return string Readed data
*/
public function read_unlimited() {
$ret = $this->unparsed_data;
$this->unparsed_data = '';
return $ret;
}
/**
* Searches first occurence of the string in input buffer
* @param string $what Needle
* @param integer $start Offset start
* @param integer $end Offset end
* @return integer Position
*/
public function search($what, $start = 0, $end = -1) {
return strpos($this->unparsed_data, $what, $start);
}
/**
* Called when new frame received.
* @param string $data Frame's data.
* @param string $type Frame's type ("STRING" OR "BINARY").
* @return boolean Success.
*/
public function on_frame($data, $type) {
if (is_callable($this->on_frame_user)) {
call_user_func($this->on_frame_user, $this, $data, $type);
}
return true;
}
public function send_frame($data, $type = null, $cb = null) {
return false;
}
/**
* Get real frame type identificator
* @param $type
* @return integer
*/
public function get_frame_type($type) {
if (is_int($type)) {
return $type;
}
if ($type === null) {
$type = 'STRING';
}
$frametype = @constant(get_class($this) . '::' . $type);
if ($frametype === null) {
error_log(__METHOD__ . ' : Undefined frametype "' . $type . '"');
}
return $frametype;
}
}
По сути тут добавлены 3 вещи: «соединение с клиентом на уровне веб сокета», «получение сообщения от клиента», «отправка сообщения клиенту».
Для начало немного теории и терминологии. «Handshake» — это с точки зрения веб сокетов процедура установления соединения поверх http. Надо ведь решить кучу вопросов: как пробиться сквозь гущу прокси и кэшэй, как защитится от злых хакеров. И термин «frame» — это кусок данных в расшифрованном виде, это сообщение от клиента или сообщение для клиента. Возможно, об этом стоило написать в начале статьи, но из-за этих вот «frame» делать сокет сервер в блокирующем режиме сокетов имхо бессмысленно. То, как сделан этот момент вот тут — это лишило меня сна не на одну ночь. В той статье не рассматривается вариант, что frame приехал не полностью или их приехало сразу два. И то и то, между прочим, вполне себе типичная ситуация, как показали логи игры.
Теперь к деталям.
Соединение с клиентом на уровне веб сокета — предполагается, что протокол (например, ws_v0) перекроет метод «on_read» и внутри себя дёрнет «handshake», когда данных будет достаточно. Далее кусок «handshake» в родителе. Далее дёргается метод «send_handshake_reply», который должен быть реализован в протоколе. Этот вот «send_handshake_reply» должен такое ответить клиенту, чтобы тот понял, что «соединение установлено», нормальным браузерам — нормальный ответ, а для Сафари — особый ответ.
Получение сообщения от клиента. Обращаю внимание, что глупые клиенты могут реализовать такой вариант, что соединение не установлено, а сообщение от пользователя уже пришло. Поэтому надо бережно относится к «unparsed_data» переменной. В каждом протоколе метод «on_read» должен понять размер передаваемого frame, убедиться, что frame целиком приехал, расшифровать приехавший frame в сообщение пользователя. В каждом протоколе это делается очень по-разному и очень кучеряво (мы ж не знаем, приехал frame полностью или нет, плюс нельзя откусить ни байта следующего frame). Далее внутри «on_read», когда данные клиента получены и расшифрованы и определён их тип (да-да и такое предусмотрено), дёргаем метод «on_frame», который внутри класса «ws», тот, в свою очередь, дёрнет custom callback (функция $my_callback, перед основным циклом которая). И в итоге $my_callback получает сообщение от клиента.
Отправка сообщения клиенту. Просто дёргается метод «send_frame», который должен быть реализован внутри протокола. Тут просто шифруем сообщение и отправляем пользователю. Разные протоколы шифруют по-разному.
Теперь прилагаю 3 протокола «v13», «v0», «ve»:
class ws_v13 extends ws {
const CONTINUATION = 0;
const STRING = 0x1;
const BINARY = 0x2;
const CONNCLOSE = 0x8;
const PING = 0x9;
const PONG = 0xA;
protected static $opcodes = [
0 => 'CONTINUATION',
0x1 => 'STRING',
0x2 => 'BINARY',
0x8 => 'CONNCLOSE',
0x9 => 'PING',
0xA => 'PONG',
];
protected $outgoingCompression = 0;
protected $framebuf = '';
/**
* Apply mask
* @param $data
* @param string|false $mask
* @return mixed
*/
public function mask($data, $mask) {
for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) {
$data[$i] = $data[$i] ^ $mask[$i % $ml];
}
return $data;
}
/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" OR "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}
if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
/*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) {
//$data = gzcompress($data, $this->outgoingCompression);
//$rsv1 = 1;
}*/
$fin = 1;
$rsv1 = 0;
$rsv2 = 0;
$rsv3 = 0;
$this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT))));
$dataLength = strlen($data);
$isMasked = false;
$isMaskedInt = $isMasked ? 128 : 0;
if ($dataLength <= 125) {
$this->write(chr($dataLength + $isMaskedInt));
} elseif ($dataLength <= 65535) {
$this->write(chr(126 + $isMaskedInt) . // 126 + 128
chr($dataLength >> 8) .
chr($dataLength & 0xFF));
} else {
$this->write(chr(127 + $isMaskedInt) . // 127 + 128
chr($dataLength >> 56) .
chr($dataLength >> 48) .
chr($dataLength >> 40) .
chr($dataLength >> 32) .
chr($dataLength >> 24) .
chr($dataLength >> 16) .
chr($dataLength >> 8) .
chr($dataLength & 0xFF));
}
if ($isMasked) {
$mask = chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF));
$this->write($mask . $this->mask($data, $mask));
} else {
$this->write($data);
}
if ($cb !== null) {
$cb();
}
return true;
}
/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = '') {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) {
return false;
}
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') {
return false;
}
if (isset($this->server['HTTP_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN'];
}
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
}
$this->write("HTTP/1.1 101 Switching Protocolsrn"
. "Upgrade: WebSocketrn"
. "Connection: Upgradern"
. "Date: " . date('r') . "rn"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "rn"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "rn"
. "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "rn"
);
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."rn");
}
$this->write($extraHeaders."rn");
return true;
}
/**
* Called when new data received
* @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_PREHANDSHAKE) {
if (!$this->handshake()) {
return;
}
}
if ($this->state === self::STATE_HANDSHAKED) {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$first = ord($this->look(1)); // first byte integer (fin, opcode)
$firstBits = decbin($first);
$opcode = (int)bindec(substr($firstBits, 4, 4));
if ($opcode === 0x8) { // CLOSE
$this->close();
return;
}
$opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false;
if (!$opcodeName) {
error_log(get_class($this) . ': Undefined opcode ' . $opcode);
$this->close();
return;
}
$second = ord($this->look(1, 1)); // second byte integer (masked, payload length)
$fin = (bool)($first >> 7);
$isMasked = (bool)($second >> 7);
$dataLength = $second & 0x7f;
$p = 2;
if ($dataLength === 0x7e) { // 2 bytes-length
if ($buflen < $p + 2) {
return; // not enough data yet
}
$dataLength = self::bytes2int($this->look(2, $p), false);
$p += 2;
} elseif ($dataLength === 0x7f) { // 8 bytes-length
if ($buflen < $p + 8) {
return; // not enough data yet
}
$dataLength = self::bytes2int($this->look(8, $p));
$p += 8;
}
if (self::maxAllowedPacket <= $dataLength) {
// Too big packet
$this->close();
return;
}
if ($isMasked) {
if ($buflen < $p + 4) {
return; // not enough data yet
}
$mask = $this->look(4, $p);
$p += 4;
}
if ($buflen < $p + $dataLength) {
return; // not enough data yet
}
$this->drain($p);
$data = $this->read($dataLength);
if ($isMasked) {
$data = $this->mask($data, $mask);
}
//Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data))));
/*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame
$data = gzuncompress($data, $this->pool->maxAllowedPacket);
}*/
if (!$fin) {
$this->framebuf .= $data;
} else {
$this->on_frame($this->framebuf . $data, $opcodeName);
$this->framebuf = '';
}
}
}
}
}
class ws_v0 extends ws {
const STRING = 0x00;
const BINARY = 0x80;
protected $key;
/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = '') {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
return false;
}
$final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key);
$this->key = null;
if (!$final_key) {
return false;
}
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
}
$this->write("HTTP/1.1 101 Web Socket Protocol Handshakern"
. "Upgrade: WebSocketrn"
. "Connection: Upgradern"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "rn"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "rn");
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."rn");
}
$this->write($extraHeaders . "rn" . $final_key);
return true;
}
/**
* Computes final key for Sec-WebSocket.
* @param string Key1
* @param string Key2
* @param string Data
* @return string Result
*/
protected function _computeFinalKey($key1, $key2, $data) {
if (strlen($data) < 8) {
error_log(get_class($this) . '::' . __METHOD__ . ' : Invalid handshake data for client ""'); // $this->addr
return false;
}
return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true);
}
/**
* Computes key for Sec-WebSocket.
* @param string Key
* @return string Result
*/
protected function _computeKey($key) {
$spaces = 0;
$digits = '';
for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
$c = substr($key, $i, 1);
if ($c === "x20") {
++$spaces;
} elseif (ctype_digit($c)) {
$digits .= $c;
}
}
if ($spaces > 0) {
$result = (float)floor($digits / $spaces);
} else {
$result = (float)$digits;
}
return pack('N', $result);
}
/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" OR "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}
if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
if ($type === 'CONNCLOSE') {
if ($cb !== null) {
$cb($this);
return true;
}
}
$type = $this->get_frame_type($type);
// Binary
if (($type & self::BINARY) === self::BINARY) {
$n = strlen($data);
$len = '';
$pos = 0;
char:
++$pos;
$c = $n >> 0 & 0x7F;
$n >>= 7;
if ($pos !== 1) {
$c += 0x80;
}
if ($c !== 0x80) {
$len = chr($c) . $len;
goto char;
};
$this->write(chr(self::BINARY) . $len . $data);
}
// String
else {
$this->write(chr(self::STRING) . $data . "xFF");
}
if ($cb !== null) {
$cb();
}
return true;
}
/**
* Called when new data received
* @return void
*/
public function on_read() {
if ($this->state === self::STATE_PREHANDSHAKE) {
if (strlen($this->unparsed_data) < 8) {
return;
}
$this->key = $this->read_unlimited();
$this->handshake();
}
if ($this->state === self::STATE_HANDSHAKED) {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$hdr = $this->look(10);
$frametype = ord(substr($hdr, 0, 1));
if (($frametype & 0x80) === 0x80) {
$len = 0;
$i = 0;
do {
if ($buflen < $i + 1) {
// not enough data yet
return;
}
$b = ord(substr($hdr, ++$i, 1));
$n = $b & 0x7F;
$len *= 0x80;
$len += $n;
} while ($b > 0x80);
if (self::maxAllowedPacket <= $len) {
// Too big packet
$this->close();
return;
}
if ($buflen < $len + $i + 1) {
// not enough data yet
return;
}
$this->drain($i + 1);
$this->on_frame($this->read($len), 'BINARY');
} else {
if (($p = $this->search("xFF")) !== false) {
if (self::maxAllowedPacket <= $p - 1) {
// Too big packet
$this->close();
return;
}
$this->drain(1);
$data = $this->read($p);
$this->drain(1);
$this->on_frame($data, 'STRING');
} else {
if (self::maxAllowedPacket < $buflen - 1) {
// Too big packet
$this->close();
return;
}
// not enough data yet
return;
}
}
}
}
}
}
class ws_ve extends ws {
const STRING = 0x00;
const BINARY = 0x80;
/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = '') {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
}
$this->write("HTTP/1.1 101 Web Socket Protocol Handshakern"
. "Upgrade: WebSocketrn"
. "Connection: Upgradern"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "rn"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "rn"
);
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."rn");
}
$this->write($extraHeaders."rn");
return true;
}
/**
* Computes key for Sec-WebSocket.
* @param string Key
* @return string Result
*/
protected function _computeKey($key) {
$spaces = 0;
$digits = '';
for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
$c = substr($key, $i, 1);
if ($c === "x20") {
++$spaces;
} elseif (ctype_digit($c)) {
$digits .= $c;
}
}
if ($spaces > 0) {
$result = (float)floor($digits / $spaces);
} else {
$result = (float)$digits;
}
return pack('N', $result);
}
/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" OR "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}
if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
if ($type === 'CONNCLOSE') {
if ($cb !== null) {
$cb($this);
return true;
}
}
// Binary
$type = $this->get_frame_type($type);
if (($type & self::BINARY) === self::BINARY) {
$n = strlen($data);
$len = '';
$pos = 0;
char:
++$pos;
$c = $n >> 0 & 0x7F;
$n >>= 7;
if ($pos !== 1) {
$c += 0x80;
}
if ($c !== 0x80) {
$len = chr($c) . $len;
goto char;
};
$this->write(chr(self::BINARY) . $len . $data);
}
// String
else {
$this->write(chr(self::STRING) . $data . "xFF");
}
if ($cb !== null) {
$cb();
}
return true;
}
/**
* Called when new data received
* @return void
*/
public function on_read() {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$hdr = $this->look(10);
$frametype = ord(substr($hdr, 0, 1));
if (($frametype & 0x80) === 0x80) {
$len = 0;
$i = 0;
do {
if ($buflen < $i + 1) {
return;
}
$b = ord(substr($hdr, ++$i, 1));
$n = $b & 0x7F;
$len *= 0x80;
$len += $n;
} while ($b > 0x80);
if (self::maxAllowedPacket <= $len) {
// Too big packet
$this->close();
return;
}
if ($buflen < $len + $i + 1) {
// not enough data yet
return;
}
$this->drain($i + 1);
$this->on_frame($this->read($len), $frametype);
} else {
if (($p = $this->search("xFF")) !== false) {
if (self::maxAllowedPacket <= $p - 1) {
// Too big packet
$this->close();
return;
}
$this->drain(1);
$data = $this->read($p);
$this->drain(1);
$this->on_frame($data, 'STRING');
} else {
if (self::maxAllowedPacket < $buflen - 1) {
// Too big packet
$this->close();
return;
}
}
}
}
}
}
Сразу хочу отметить, что протокол VE не тестировал — понятия не имею кто его использует. Но добросовестно сконвертировал и урезал код из PhpDeamon.
Протокол V13 используют все нормальные браузеры (FireFox, Opera, Chrome, Яндекс). Даже IE его использует (извините, после IE6 — для меня IE никогда не будет «браузером», даже команда разработчик IE заявляли, что это «не браузер, а тонкий клиент»). Протокол V0 использует браузер «Сафари».
Вместо заключения
Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon». В итоге сокет сервер, приведенный тут, совместим со всеми современными браузерами. Работает без утечек памяти и годится для использования в высоконагруженных системах.
Автор: anlide