Всем привет!
Не так давно я начал переписывать один интересный проект, своего рода чат, в котором до моего вмешательства использовался комет-сервер на вебсокеты. Рассказывать о том, что такое WebSockets я не стану, т.к я думаю, что это всем уже известно, да и пост не об этом. Поэтому под катом просто немного PhP-кода.
Да, я знаю, что готовых решений полным-полно, однако меня они по тем или иным причинам не устроили и я решил написать свой PhP WebSocket Server. Плюс ко всему, это интересно :) Так как до этого я не сталкивался с сокетами, чтобы понять на деле, что это за блюдо и с чем его едят, пришлось покурить немного мануалов. Так же я разобрал и изучил принцип работы некоторых готовых решений, что позволило мне лучше узнать их. Сам код я выложу в конце статьи, а перед этим хотелось бы сказать пару слов об его использовании.
Сразу скажу, что это первая версия сервера (так сказать 1.0) и ее функционал еще немного сыроват.
Для того, чтобы запустить наш сервер необходимо сделать следущее:
// Инклудим наш сервер
require_once "./WebSocketApi.php";
//..
$wsApi = new WebSocketApi();
// Запускаем !
$wsApi->startWServer("127.0.0.7", 8000);
Ничего сложного, не правда ли?
В сервере есть 3 event'a.
- onOpen — подключение клиента к серверу
- onMsg — получение сообщения от клиента
- onClose — закрытие соединения с клиентом
Эти event'ы позволяют нам вызывать пользовательские функции при определенных событиях.
Например:
// Инклудим наш сервер
require_once "./WebSocketApi.php";
//..
$wsApi = new WebSocketApi();
// Вызываем наши функции при соединении с клиентом
$wsApi->events['onOpen'] = array(
"myFunction", "myFunction_2"
);
// Запускаем !
$wsApi->startWServer("127.0.0.7", 8000);
Или так:
// Инклудим наш сервер
require_once "./WebSocketApi.php";
//..
$wsApi = new WebSocketApi();
// Вызываем наши функции при соединении с клиентом
$wsApi->events['onMsg'] = array(
"function_1" => array("param_1", "param_2"),
"function_2" => array("param_1")
);
// Запускаем !
$wsApi->startWServer("127.0.0.7", 8000);
А вот и сам код:
<?php
/**
* User: byabuzyak
* Date: 6/23/14
* Time: 9:12 PM
*/
class WebSocketApi {
const MAX_FRAME_RECV = 100000;
const MAX_TIMEOUT = 25;
const TIMEOUT_PONG = 5;
const TIMEOUT_RECV = 10;
const OPCODE_CONTINUATION = 0;
const OPCODE_TEXT = 1;
const OPCODE_BINARY = 2;
const OPCODE_CLOSE = 8;
const OPCODE_PING = 9;
const OPCODE_PONG = 10;
const READY_STATE_CONNECTING = 0;
const READY_STATE_OPEN = 1;
const READY_STATE_CLOSING = 2;
const READY_STATE_CLOSED = 3;
const PAYLOAD_LENGTH_16 = 126;
const PAYLOAD_LENGTH_63 = 127;
const STATUS_PROTOCOL_ERROR = 1002;
const STATUS_MESSAGE_TOO_BIG = 1004;
const FIN = 128;
const MASK = 128;
/**
* @var array
*/
public $clients = array();
/**
* @var array
*/
public $ws = array();
/**
* @var array
*/
public $events = array();
/**
* @var int
*/
public $clientsCount = 0;
/**
* @var null
*/
public static $_instance = null;
/**
* @return WebSocketApi|null
*/
public static function getInstance(){
if(self::$_instance === null){
self::$_instance = new self();
}
return self::$_instance;
}
/**
* @param string $host
* @param int $port
* @return bool
*/
public function startWServer($host = '127.0.0.1', $port = 8000){
if(isset($this->ws[0]))
return false;
if(phpversion() >= 5.5){
cli_set_process_title("WebSockets Server 1.0");
}
if (!$this->ws[0] = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) {
return false;
}
if (!socket_set_option($this->ws[0], SOL_SOCKET, SO_REUSEADDR, 1)) {
socket_close($this->ws[0]);
return false;
}
if (!socket_bind($this->ws[0], $host, $port)) {
socket_close($this->ws[0]);
return false;
}
if (!socket_listen($this->ws[0], 10)) {
socket_close($this->ws[0]);
return false;
}
$write = null;
$except = null;
$nextPingCheck = time() + 1;
while(isset($this->ws[0])){
$read = $this->ws;
$count = socket_select($read, $write, $except, 1);
if($count === false){
socket_close($this->ws[0]);
return false;
}elseif($count > 0){
foreach($read as $clientId => $socket){
if($clientId != 0){
$buffer = '';
$bytes = @socket_recv($socket, $buffer, 4096, 0);
if($bytes === false){
$this->_closeClient($clientId);
}elseif($bytes > 0){
if(!$this->_checkClient($clientId, $buffer, $bytes)){
$this->_closeClient($clientId);
}
}else{
$this->_removeClient($clientId);
}
}else{
$client = socket_accept($this->ws[0]);
if($client !== false){
// TODO: Добавить ограничения по макс. количеству клиентов
$ip = '';
$result = socket_getpeername($client, $ip);
$ip = ip2long($ip);
if($result !== false){
$this->_addClient($client, $ip);
}else{
socket_close($client);
}
}
}
}
}
if(time() >= $nextPingCheck){
$nextPingCheck = time() + 1;
$this->_checkActiveClients();
}
}
return true;
}
/**
*
*/
private function _checkActiveClients(){
$time = time();
foreach($this->clients as $clientId => $socket){
if($socket->state != self::READY_STATE_CLOSED){
if($socket->ready_state !== false){
if($time >= $socket->ready_state + self::TIMEOUT_PONG){
$this->_closeClient($clientId);
$this->_removeClient($clientId);
}
}elseif($time >= $socket->time + self::TIMEOUT_RECV){
if($socket->state != self::READY_STATE_CONNECTING){
$this->clients[$clientId]->ready_state = time();
$this->sendClientMsg($clientId, self::OPCODE_PING, '');
}else{
$this->_removeClient($clientId);
}
}
}
}
}
/**
* @param $socket
* @param string $clientIp
*/
private function _addClient($socket, $clientIp = ''){
$this->clientsCount += 1;
$clientId = $this->_nextId();
$this->clients[$clientId] = (object) array(
'socket' => $socket,
'msg_buffer' => '',
'state' => self::READY_STATE_CONNECTING,
'time' => time(),
'ready_state' => false,
'close_status' => 0,
'client_ip' => $clientIp,
'header_length' => false,
'buffer_length' => 0,
'buffer' => '',
'msg_opcode' => 0,
'msg_data_len' => 0
);
$this->ws[$clientId] = $socket;
}
/**
* @return int
*/
private function _nextId(){
$i = 1;
while(isset($this->ws[$i]))
$i++;
return $i;
}
/**
* @param $clientId
*/
private function _removeClient($clientId){
if(array_key_exists('onClose', $this->events)){
foreach($this->events['onClose'] as $function => $args){
if(is_array($args)){
call_user_func_array($function, $args);
}else{
call_user_func($function, $args);
}
}
}
socket_close($this->clients[$clientId]->socket);
unset($this->ws[$clientId], $this->clients[$clientId]);
$this->clientsCount -= 1;
}
/**
* @param $clientId
* @return bool
*/
private function _closeClient($clientId){
if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){
return true;
}
$this->clients[$clientId]->close_status = self::STATUS_PROTOCOL_ERROR;
$this->sendClientMsg($clientId, self::OPCODE_CLOSE, pack('n', self::STATUS_PROTOCOL_ERROR));
$this->clients[$clientId]->state = self::READY_STATE_CLOSING;
}
/**
* @param $clientId
* @param $opCode
* @param $msg
* @return bool
*/
public function sendClientMsg($clientId, $opCode, $msg){
if($this->clients[$clientId]->state == self::READY_STATE_CLOSING || $this->clients[$clientId]->state == self::READY_STATE_CLOSED){
return true;
}
$msgLength = strlen($msg);
$buffSize = 4096;
$frameCount = ceil($msgLength / $buffSize);
if($frameCount == 0){
$frameCount = 1;
}
$maxFrame = $frameCount - 1;
$lastFrameBuffLength = ($msgLength % $buffSize) != 0 ? $msgLength % $buffSize : ($msgLength != 0 ? $buffSize : 0);
for($i=0; $i<$frameCount; $i++){
$final = $i != $maxFrame ? 0 : self::FIN;
$opCode = $i != 0 ? self::OPCODE_CONTINUATION : $opCode;
$buffLength = $i != $maxFrame ? $buffSize : $lastFrameBuffLength;
if($buffLength <= 125){
$payloadLength = $buffLength;
$payloadLengthExt = '';
$payloadLengthExtL = 0;
}elseif($buffLength <= 65535){
$payloadLength = self::PAYLOAD_LENGTH_16;
$payloadLengthExt = pack('n', $buffLength);
$payloadLengthExtL = 2;
}else{
$payloadLength = self::PAYLOAD_LENGTH_63;
$payloadLengthExt = pack('xxxxN', $buffLength);
$payloadLengthExtL = 8;
}
$buffer = pack('n', (($final | $opCode) << 8) | $payloadLength) . $payloadLengthExt . substr($msg, $i * $buffSize, $buffLength);
$socket = $this->clients[$clientId]->socket;
$left = 2 + $payloadLengthExtL + $buffLength;
do{
$sent = @socket_send($socket, $buffer, $left, 0);
if($sent === false){
return false;
}
$left -= $sent;
if($sent > 0){
$buffer = substr($buffer, $sent);
}
}
while($left > 0);
}
return true;
}
/**
* @param $clientId
* @param $buffer
* @param $bLength
* @return bool
*/
private function _checkClient($clientId, &$buffer, $bLength){
if($this->clients[$clientId]->state == self::READY_STATE_OPEN){
$result = $this->_buildClientFrame($clientId, $buffer, $bLength);
}elseif($this->clients[$clientId]->state == self::READY_STATE_CONNECTING){
$result = $this->_makeHandShake($clientId, $buffer);
if($result){
$this->clients[$clientId]->state = self::READY_STATE_OPEN;
if(array_key_exists('onOpen', $this->events)){
foreach($this->events['onOpen'] as $function => $args){
if(is_array($args)){
call_user_func_array($function, $args);
}else{
call_user_func($function, $args);
}
}
}
}
}else{
$result = false;
}
return $result;
}
/**
* @param $clientId
* @param $buffer
* @param $bufferLength
* @return bool
*/
private function _buildClientFrame($clientId, &$buffer, $bufferLength){
$this->clients[$clientId]->buffer_length += $bufferLength;
$this->clients[$clientId]->buffer .= $buffer;
if($this->clients[$clientId]->header_length !== false || $this->_checkSizeClientFrame($clientId) == true){
$headerLength = ($this->clients[$clientId]->header_length <= 125 ? 0 : ($this->clients[$clientId]->header_length <= 65535 ? 2 : 8)) + 6;
$frameLength = $this->clients[$clientId]->header_length + $headerLength;
if($this->clients[$clientId]->buffer_length >= $frameLength){
$nextFrameLength = $this->clients[$clientId]->buffer_length - $frameLength;
if($nextFrameLength > 0){
$this->clients[$clientId]->buffer_length -= $nextFrameLength;
$nextFrameBytes = substr($this->clients[$clientId]->buffer, $frameLength);
$this->clients[$clientId]->buffer = substr($this->clients[$clientId]->buffer, 0, $frameLength);
}
$result = $this->_processClientFrame($clientId);
if(isset($this->clients[$clientId])){
$this->clients[$clientId]->header_length = false;
$this->clients[$clientId]->buffer_length = 0;
$this->clients[$clientId]->buffer = '';
}
if($nextFrameLength <= 0 || !$result){
return $result;
}
return $this->_buildClientFrame($clientId, $nextFrameBytes, $nextFrameLength);
}
}
return true;
}
/**
* @param $clientId
* @return bool
*/
private function _processClientFrame($clientId){
$this->clients[$clientId]->time = time();
$buffer = &$this->clients[$clientId]->buffer;
if(substr($buffer, 5, 1) === false)
return false;
$a1 = ord(substr($buffer, 0, 1));
$a2 = ord(substr($buffer, 1, 1));
$f = $a1 & self::FIN;
$opCode = $a1 & 15;
$mask = $a2 & self::MASK;
if(!$mask)
return false;
$seek = $this->clients[$clientId]->header_length <= 125 ? 2 : ($this->clients[$clientId]->header_length <= 65535 ? 4 : 10);
$maskKey = substr($buffer, $seek, 4);
$array = unpack('Na', $maskKey);
$maskKey = $array['a'];
$maskKey = array(
$maskKey >> 24,
($maskKey >> 16) & 255,
($maskKey >> 8) & 255,
$maskKey & 255
);
$seek += 4;
if (substr($buffer, $seek, 1) !== false) {
$data = str_split(substr($buffer, $seek));
foreach($data as $key => $byte){
$data[$key] = chr(ord($byte) ^ ($maskKey[$key % 4]));
}
$data = implode('', $data);
}else{
$data = '';
}
if ($opCode != self::OPCODE_CONTINUATION && $this->clients[$clientId]->msg_data_len > 0) {
$this->clients[$clientId]->msg_data_len = 0;
$this->clients[$clientId]->msg_buffer = '';
}
if($f == self::FIN){
if($opCode != self::OPCODE_CONTINUATION){
return
$this->_processClientMsg($clientId, $opCode, $data, $this->clients[$clientId]->header_length);
}else{
$this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length;
$this->clients[$clientId]->msg_buffer .= $data;
$result = $this->_processClientMsg($clientId, $this->clients[$clientId]->msg_opcode, $this->clients[$clientId]->msg_buffer, $this->clients[$clientId]->msg_data_len);
if(isset($this->clients[$clientId])){
$this->clients[$clientId]->msg_buffer = '';
$this->clients[$clientId]->msg_opcode = 0;
$this->clients[$clientId]->msg_data_len = 0;
}
return $result;
}
}else{
if($opCode & 8)
return false;
$this->clients[$clientId]->msg_data_len += $this->clients[$clientId]->header_length;
$this->clients[$clientId]->msg_buffer .= $data;
if($opCode != self::OPCODE_CONTINUATION){
$this->clients[$clientId]->msg_opcode = $opCode;
}
}
return true;
}
/**
* @param $clientId
* @param $opCode
* @param $data
* @param $dataLength
* @return bool
*/
private function _processClientMsg($clientId, $opCode, &$data, $dataLength){
if($opCode == self::OPCODE_PING){
$this->sendClientMsg($clientId, self::OPCODE_PONG, $data);
}elseif($opCode == self::OPCODE_PONG){
if($this->clients[$clientId]->ready_state !== false){
$this->clients[$clientId]->ready_state = false;
}
}elseif($opCode == self::OPCODE_CLOSE){
if($this->clients[$clientId]->state == self::READY_STATE_CLOSING){
$this->clients[$clientId]->state = self::READY_STATE_CLOSED;
}else{
// TODO: добавить типы закрытия
$this->_closeClient($clientId);
}
$this->_removeClient($clientId);
}elseif($opCode == self::OPCODE_TEXT || $opCode == self::OPCODE_BINARY){
if(array_key_exists('onMsg', $this->events)){
foreach($this->events['onMsg'] as $function => $args){
if(is_array($args)){
call_user_func_array($function, $args);
}else{
call_user_func($function, $args);
}
}
}
}else{
return false;
}
return true;
}
/**
* @param $clientId
* @return bool
*/
private function _checkSizeClientFrame($clientId){
if($this->clients[$clientId]->buffer_length > 1){
$payloadLength = ord(substr($this->clients[$clientId]->buffer, 1, 1)) & 127;
if ($payloadLength <= 125) {
$this->clients[$clientId]->header_length = $payloadLength;
}elseif($payloadLength == 126){
if (substr($this->clients[$clientId]->buffer, 3, 1) !== false) {
$payloadLengthExtended = substr($this->clients[$clientId]->buffer, 2, 2);
$array = unpack('na', $payloadLengthExtended);
$this->clients[$clientId]->header_length = $array['a'];
}
}else{
if (substr($this->clients[$clientId]->buffer, 9, 1) !== false) {
$payloadLengthExtended = substr($this->clients[$clientId]->buffer, 2, 8);
$payloadLengthExtended32_1 = substr($payloadLengthExtended, 0, 4);
$array = unpack('Na', $payloadLengthExtended32_1);
if ($array['a'] != 0 || ord(substr($payloadLengthExtended, 4, 1)) & 128) {
$this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
return false;
}
$payloadLengthExtended32_2 = substr($payloadLengthExtended, 4, 4);
$array = unpack('Na', $payloadLengthExtended32_2);
if ($array['a'] > 2147479538) {
$this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
return false;
}
$this->clients[$clientId]->header_length = $array['a'];
}
}
if ($this->clients[$clientId]->header_length !== false) {
if ($this->clients[$clientId]->header_length > self::MAX_FRAME_RECV) {
$this->clients[$clientId]->header_length = false;
$this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
return false;
}
$controlFrame = (ord(substr($this->clients[$clientId]->buffer, 0, 1)) & 8) == 8;
if (!$controlFrame) {
$newMessagePayloadLength = $this->clients[$clientId]->msg_data_len + $this->clients[$clientId]->header_length;
if ($newMessagePayloadLength > self::MAX_FRAME_RECV || $newMessagePayloadLength > 2147483647) {
$this->_closeClient($clientId, self::STATUS_MESSAGE_TOO_BIG);
return false;
}
}
return true;
}
}
return false;
}
/**
* @param $clientId
* @param $buffer
* @return bool
*/
private function _makeHandShake($clientId, $buffer){
$sep = strpos($buffer, "rnrn");
if (!$sep) return false;
$headers = explode("rn", substr($buffer, 0, $sep));
$headersCount = sizeof($headers);
if ($headersCount < 1)
return false;
$request = &$headers[0];
$requestParts = explode(' ', $request);
$requestPartsSize = sizeof($requestParts);
if ($requestPartsSize < 3)
return false;
if (strtoupper($requestParts[0]) != 'GET')
return false;
$httpPart = &$requestParts[$requestPartsSize - 1];
$httpParts = explode('/', $httpPart);
if (!isset($httpParts[1]) || (float) $httpParts[1] < 1.1)
return false;
$headersKeyed = array();
for ($i=1; $i<$headersCount; $i++) {
$parts = explode(':', $headers[$i]);
if (!isset($parts[1]))
return false;
$headersKeyed[trim($parts[0])] = trim($parts[1]);
}
if (!isset($headersKeyed['Host']))
return false;
if (!isset($headersKeyed['Sec-WebSocket-Key']))
return false;
$key = $headersKeyed['Sec-WebSocket-Key'];
if (strlen(base64_decode($key)) != 16)
return false;
if (!isset($headersKeyed['Sec-WebSocket-Version']) || (int) $headersKeyed['Sec-WebSocket-Version'] < 7)
return false;
$hash = base64_encode(sha1($key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
$headers = array(
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: '.$hash
);
$headers = implode("rn", $headers)."rnrn";
$socket = $this->clients[$clientId]->socket;
$left = strlen($headers);
do{
$sent = @socket_send($socket, $headers, $left, 0);
if($sent === false)
return false;
$left -= $sent;
if($sent > 0)
$headers = substr($headers, $sent);
}
while($left > 0);
return true;
}
}
Чуть позже создам репозиторий на гитхабе и выложу туда. В дальнейшем планирую поддерживать и развивать.
Хотелось бы услышать от Вас уважаемых советы, пожелания или замечания.
Спасибо за внимание!
P.S в планах реализация интересных плюшек. Просто пока что не хватает времени на них.
Автор: byabuzyak