Websocket.php 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Protocols;
  15. use Workerman\Connection\ConnectionInterface;
  16. use Workerman\Connection\TcpConnection;
  17. use Workerman\Worker;
  18. /**
  19. * WebSocket protocol.
  20. */
  21. class Websocket implements \Workerman\Protocols\ProtocolInterface
  22. {
  23. /**
  24. * Websocket blob type.
  25. *
  26. * @var string
  27. */
  28. const BINARY_TYPE_BLOB = "\x81";
  29. /**
  30. * Websocket arraybuffer type.
  31. *
  32. * @var string
  33. */
  34. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  35. /**
  36. * Check the integrity of the package.
  37. *
  38. * @param string $buffer
  39. * @param ConnectionInterface $connection
  40. * @return int
  41. */
  42. public static function input($buffer, ConnectionInterface $connection)
  43. {
  44. // Receive length.
  45. $recv_len = \strlen($buffer);
  46. // We need more data.
  47. if ($recv_len < 6) {
  48. return 0;
  49. }
  50. // Has not yet completed the handshake.
  51. if (empty($connection->websocketHandshake)) {
  52. return static::dealHandshake($buffer, $connection);
  53. }
  54. // Buffer websocket frame data.
  55. if ($connection->websocketCurrentFrameLength) {
  56. // We need more frame data.
  57. if ($connection->websocketCurrentFrameLength > $recv_len) {
  58. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  59. return 0;
  60. }
  61. } else {
  62. $firstbyte = \ord($buffer[0]);
  63. $secondbyte = \ord($buffer[1]);
  64. $data_len = $secondbyte & 127;
  65. $is_fin_frame = $firstbyte >> 7;
  66. $masked = $secondbyte >> 7;
  67. if (!$masked) {
  68. Worker::safeEcho("frame not masked so close the connection\n");
  69. $connection->close();
  70. return 0;
  71. }
  72. $opcode = $firstbyte & 0xf;
  73. switch ($opcode) {
  74. case 0x0:
  75. break;
  76. // Blob type.
  77. case 0x1:
  78. break;
  79. // Arraybuffer type.
  80. case 0x2:
  81. break;
  82. // Close package.
  83. case 0x8:
  84. // Try to emit onWebSocketClose callback.
  85. if (isset($connection->onWebSocketClose) || isset($connection->worker->onWebSocketClose)) {
  86. try {
  87. \call_user_func(isset($connection->onWebSocketClose)?$connection->onWebSocketClose:$connection->worker->onWebSocketClose, $connection);
  88. } catch (\Exception $e) {
  89. Worker::log($e);
  90. exit(250);
  91. } catch (\Error $e) {
  92. Worker::log($e);
  93. exit(250);
  94. }
  95. } // Close connection.
  96. else {
  97. $connection->close("\x88\x02\x03\xe8", true);
  98. }
  99. return 0;
  100. // Ping package.
  101. case 0x9:
  102. break;
  103. // Pong package.
  104. case 0xa:
  105. break;
  106. // Wrong opcode.
  107. default :
  108. Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
  109. $connection->close();
  110. return 0;
  111. }
  112. // Calculate packet length.
  113. $head_len = 6;
  114. if ($data_len === 126) {
  115. $head_len = 8;
  116. if ($head_len > $recv_len) {
  117. return 0;
  118. }
  119. $pack = \unpack('nn/ntotal_len', $buffer);
  120. $data_len = $pack['total_len'];
  121. } else {
  122. if ($data_len === 127) {
  123. $head_len = 14;
  124. if ($head_len > $recv_len) {
  125. return 0;
  126. }
  127. $arr = \unpack('n/N2c', $buffer);
  128. $data_len = $arr['c1']*4294967296 + $arr['c2'];
  129. }
  130. }
  131. $current_frame_length = $head_len + $data_len;
  132. $total_package_size = \strlen($connection->websocketDataBuffer) + $current_frame_length;
  133. if ($total_package_size > $connection->maxPackageSize) {
  134. Worker::safeEcho("error package. package_length=$total_package_size\n");
  135. $connection->close();
  136. return 0;
  137. }
  138. if ($is_fin_frame) {
  139. if ($opcode === 0x9) {
  140. if ($recv_len >= $current_frame_length) {
  141. $ping_data = static::decode(\substr($buffer, 0, $current_frame_length), $connection);
  142. $connection->consumeRecvBuffer($current_frame_length);
  143. $tmp_connection_type = isset($connection->websocketType) ? $connection->websocketType : static::BINARY_TYPE_BLOB;
  144. $connection->websocketType = "\x8a";
  145. if (isset($connection->onWebSocketPing) || isset($connection->worker->onWebSocketPing)) {
  146. try {
  147. \call_user_func(isset($connection->onWebSocketPing)?$connection->onWebSocketPing:$connection->worker->onWebSocketPing, $connection, $ping_data);
  148. } catch (\Exception $e) {
  149. Worker::log($e);
  150. exit(250);
  151. } catch (\Error $e) {
  152. Worker::log($e);
  153. exit(250);
  154. }
  155. } else {
  156. $connection->send($ping_data);
  157. }
  158. $connection->websocketType = $tmp_connection_type;
  159. if ($recv_len > $current_frame_length) {
  160. return static::input(\substr($buffer, $current_frame_length), $connection);
  161. }
  162. }
  163. return 0;
  164. } else if ($opcode === 0xa) {
  165. if ($recv_len >= $current_frame_length) {
  166. $pong_data = static::decode(\substr($buffer, 0, $current_frame_length), $connection);
  167. $connection->consumeRecvBuffer($current_frame_length);
  168. $tmp_connection_type = isset($connection->websocketType) ? $connection->websocketType : static::BINARY_TYPE_BLOB;
  169. $connection->websocketType = "\x8a";
  170. // Try to emit onWebSocketPong callback.
  171. if (isset($connection->onWebSocketPong) || isset($connection->worker->onWebSocketPong)) {
  172. try {
  173. \call_user_func(isset($connection->onWebSocketPong)?$connection->onWebSocketPong:$connection->worker->onWebSocketPong, $connection, $pong_data);
  174. } catch (\Exception $e) {
  175. Worker::log($e);
  176. exit(250);
  177. } catch (\Error $e) {
  178. Worker::log($e);
  179. exit(250);
  180. }
  181. }
  182. $connection->websocketType = $tmp_connection_type;
  183. if ($recv_len > $current_frame_length) {
  184. return static::input(\substr($buffer, $current_frame_length), $connection);
  185. }
  186. }
  187. return 0;
  188. }
  189. return $current_frame_length;
  190. } else {
  191. $connection->websocketCurrentFrameLength = $current_frame_length;
  192. }
  193. }
  194. // Received just a frame length data.
  195. if ($connection->websocketCurrentFrameLength === $recv_len) {
  196. static::decode($buffer, $connection);
  197. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  198. $connection->websocketCurrentFrameLength = 0;
  199. return 0;
  200. } // The length of the received data is greater than the length of a frame.
  201. elseif ($connection->websocketCurrentFrameLength < $recv_len) {
  202. static::decode(\substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
  203. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  204. $current_frame_length = $connection->websocketCurrentFrameLength;
  205. $connection->websocketCurrentFrameLength = 0;
  206. // Continue to read next frame.
  207. return static::input(\substr($buffer, $current_frame_length), $connection);
  208. } // The length of the received data is less than the length of a frame.
  209. else {
  210. return 0;
  211. }
  212. }
  213. /**
  214. * Websocket encode.
  215. *
  216. * @param string $buffer
  217. * @param ConnectionInterface $connection
  218. * @return string
  219. */
  220. public static function encode($buffer, ConnectionInterface $connection)
  221. {
  222. if (!is_scalar($buffer)) {
  223. throw new \Exception("You can't send(" . \gettype($buffer) . ") to client, you need to convert it to a string. ");
  224. }
  225. $len = \strlen($buffer);
  226. if (empty($connection->websocketType)) {
  227. $connection->websocketType = static::BINARY_TYPE_BLOB;
  228. }
  229. $first_byte = $connection->websocketType;
  230. if ($len <= 125) {
  231. $encode_buffer = $first_byte . \chr($len) . $buffer;
  232. } else {
  233. if ($len <= 65535) {
  234. $encode_buffer = $first_byte . \chr(126) . \pack("n", $len) . $buffer;
  235. } else {
  236. $encode_buffer = $first_byte . \chr(127) . \pack("xxxxN", $len) . $buffer;
  237. }
  238. }
  239. // Handshake not completed so temporary buffer websocket data waiting for send.
  240. if (empty($connection->websocketHandshake)) {
  241. if (empty($connection->tmpWebsocketData)) {
  242. $connection->tmpWebsocketData = '';
  243. }
  244. // If buffer has already full then discard the current package.
  245. if (\strlen($connection->tmpWebsocketData) > $connection->maxSendBufferSize) {
  246. if ($connection->onError) {
  247. try {
  248. \call_user_func($connection->onError, $connection, \WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
  249. } catch (\Exception $e) {
  250. Worker::log($e);
  251. exit(250);
  252. } catch (\Error $e) {
  253. Worker::log($e);
  254. exit(250);
  255. }
  256. }
  257. return '';
  258. }
  259. $connection->tmpWebsocketData .= $encode_buffer;
  260. // Check buffer is full.
  261. if ($connection->maxSendBufferSize <= \strlen($connection->tmpWebsocketData)) {
  262. if ($connection->onBufferFull) {
  263. try {
  264. \call_user_func($connection->onBufferFull, $connection);
  265. } catch (\Exception $e) {
  266. Worker::log($e);
  267. exit(250);
  268. } catch (\Error $e) {
  269. Worker::log($e);
  270. exit(250);
  271. }
  272. }
  273. }
  274. // Return empty string.
  275. return '';
  276. }
  277. return $encode_buffer;
  278. }
  279. /**
  280. * Websocket decode.
  281. *
  282. * @param string $buffer
  283. * @param ConnectionInterface $connection
  284. * @return string
  285. */
  286. public static function decode($buffer, ConnectionInterface $connection)
  287. {
  288. $len = \ord($buffer[1]) & 127;
  289. if ($len === 126) {
  290. $masks = \substr($buffer, 4, 4);
  291. $data = \substr($buffer, 8);
  292. } else {
  293. if ($len === 127) {
  294. $masks = \substr($buffer, 10, 4);
  295. $data = \substr($buffer, 14);
  296. } else {
  297. $masks = \substr($buffer, 2, 4);
  298. $data = \substr($buffer, 6);
  299. }
  300. }
  301. $dataLength = \strlen($data);
  302. $masks = \str_repeat($masks, \floor($dataLength / 4)) . \substr($masks, 0, $dataLength % 4);
  303. $decoded = $data ^ $masks;
  304. if ($connection->websocketCurrentFrameLength) {
  305. $connection->websocketDataBuffer .= $decoded;
  306. return $connection->websocketDataBuffer;
  307. } else {
  308. if ($connection->websocketDataBuffer !== '') {
  309. $decoded = $connection->websocketDataBuffer . $decoded;
  310. $connection->websocketDataBuffer = '';
  311. }
  312. return $decoded;
  313. }
  314. }
  315. /**
  316. * Websocket handshake.
  317. *
  318. * @param string $buffer
  319. * @param TcpConnection $connection
  320. * @return int
  321. */
  322. public static function dealHandshake($buffer, TcpConnection $connection)
  323. {
  324. // HTTP protocol.
  325. if (0 === \strpos($buffer, 'GET')) {
  326. // Find \r\n\r\n.
  327. $heder_end_pos = \strpos($buffer, "\r\n\r\n");
  328. if (!$heder_end_pos) {
  329. return 0;
  330. }
  331. $header_length = $heder_end_pos + 4;
  332. // Get Sec-WebSocket-Key.
  333. $Sec_WebSocket_Key = '';
  334. if (\preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/i", $buffer, $match)) {
  335. $Sec_WebSocket_Key = $match[1];
  336. } else {
  337. $connection->send("HTTP/1.1 200 Websocket\r\nServer: workerman/".Worker::VERSION."\r\n\r\n<div style=\"text-align:center\"><h1>Websocket</h1><hr>powered by <a href=\"https://www.workerman.net\">workerman ".Worker::VERSION."</a></div>",
  338. true);
  339. $connection->close();
  340. return 0;
  341. }
  342. // Calculation websocket key.
  343. $new_key = \base64_encode(\sha1($Sec_WebSocket_Key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
  344. // Handshake response data.
  345. $handshake_message = "HTTP/1.1 101 Switching Protocols\r\n"
  346. ."Upgrade: websocket\r\n"
  347. ."Sec-WebSocket-Version: 13\r\n"
  348. ."Connection: Upgrade\r\n"
  349. ."Sec-WebSocket-Accept: " . $new_key . "\r\n";
  350. // Websocket data buffer.
  351. $connection->websocketDataBuffer = '';
  352. // Current websocket frame length.
  353. $connection->websocketCurrentFrameLength = 0;
  354. // Current websocket frame data.
  355. $connection->websocketCurrentFrameBuffer = '';
  356. // Consume handshake data.
  357. $connection->consumeRecvBuffer($header_length);
  358. // blob or arraybuffer
  359. if (empty($connection->websocketType)) {
  360. $connection->websocketType = static::BINARY_TYPE_BLOB;
  361. }
  362. $has_server_header = false;
  363. // Try to emit onWebSocketConnect callback.
  364. if (isset($connection->onWebSocketConnect) || isset($connection->worker->onWebSocketConnect)) {
  365. static::parseHttpHeader($buffer);
  366. try {
  367. \call_user_func(isset($connection->onWebSocketConnect)?$connection->onWebSocketConnect:$connection->worker->onWebSocketConnect, $connection, $buffer);
  368. } catch (\Exception $e) {
  369. Worker::log($e);
  370. exit(250);
  371. } catch (\Error $e) {
  372. Worker::log($e);
  373. exit(250);
  374. }
  375. if (!empty($_SESSION) && \class_exists('\GatewayWorker\Lib\Context')) {
  376. $connection->session = \GatewayWorker\Lib\Context::sessionEncode($_SESSION);
  377. }
  378. $_GET = $_SERVER = $_SESSION = $_COOKIE = array();
  379. if (isset($connection->headers)) {
  380. if (\is_array($connection->headers)) {
  381. foreach ($connection->headers as $header) {
  382. if (\strpos($header, 'Server:') === 0) {
  383. $has_server_header = true;
  384. }
  385. $handshake_message .= "$header\r\n";
  386. }
  387. } else {
  388. $handshake_message .= "$connection->headers\r\n";
  389. }
  390. }
  391. }
  392. if (!$has_server_header) {
  393. $handshake_message .= "Server: workerman/".Worker::VERSION."\r\n";
  394. }
  395. $handshake_message .= "\r\n";
  396. // Send handshake response.
  397. $connection->send($handshake_message, true);
  398. // Mark handshake complete..
  399. $connection->websocketHandshake = true;
  400. // There are data waiting to be sent.
  401. if (!empty($connection->tmpWebsocketData)) {
  402. $connection->send($connection->tmpWebsocketData, true);
  403. $connection->tmpWebsocketData = '';
  404. }
  405. if (\strlen($buffer) > $header_length) {
  406. return static::input(\substr($buffer, $header_length), $connection);
  407. }
  408. return 0;
  409. } // Is flash policy-file-request.
  410. elseif (0 === \strpos($buffer, '<polic')) {
  411. $policy_xml = '<?xml version="1.0"?><cross-domain-policy><site-control permitted-cross-domain-policies="all"/><allow-access-from domain="*" to-ports="*"/></cross-domain-policy>' . "\0";
  412. $connection->send($policy_xml, true);
  413. $connection->consumeRecvBuffer(\strlen($buffer));
  414. return 0;
  415. }
  416. // Bad websocket handshake request.
  417. $connection->send("HTTP/1.1 200 Websocket\r\nServer: workerman/".Worker::VERSION."\r\n\r\n<div style=\"text-align:center\"><h1>Websocket</h1><hr>powered by <a href=\"https://www.workerman.net\">workerman ".Worker::VERSION."</a></div>",
  418. true);
  419. $connection->close();
  420. return 0;
  421. }
  422. /**
  423. * Parse http header.
  424. *
  425. * @param string $buffer
  426. * @return void
  427. */
  428. protected static function parseHttpHeader($buffer)
  429. {
  430. // Parse headers.
  431. list($http_header, ) = \explode("\r\n\r\n", $buffer, 2);
  432. $header_data = \explode("\r\n", $http_header);
  433. if ($_SERVER) {
  434. $_SERVER = array();
  435. }
  436. list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = \explode(' ',
  437. $header_data[0]);
  438. unset($header_data[0]);
  439. foreach ($header_data as $content) {
  440. // \r\n\r\n
  441. if (empty($content)) {
  442. continue;
  443. }
  444. list($key, $value) = \explode(':', $content, 2);
  445. $key = \str_replace('-', '_', \strtoupper($key));
  446. $value = \trim($value);
  447. $_SERVER['HTTP_' . $key] = $value;
  448. switch ($key) {
  449. // HTTP_HOST
  450. case 'HOST':
  451. $tmp = \explode(':', $value);
  452. $_SERVER['SERVER_NAME'] = $tmp[0];
  453. if (isset($tmp[1])) {
  454. $_SERVER['SERVER_PORT'] = $tmp[1];
  455. }
  456. break;
  457. // cookie
  458. case 'COOKIE':
  459. \parse_str(\str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
  460. break;
  461. }
  462. }
  463. // QUERY_STRING
  464. $_SERVER['QUERY_STRING'] = \parse_url($_SERVER['REQUEST_URI'], \PHP_URL_QUERY);
  465. if ($_SERVER['QUERY_STRING']) {
  466. // $GET
  467. \parse_str($_SERVER['QUERY_STRING'], $_GET);
  468. } else {
  469. $_SERVER['QUERY_STRING'] = '';
  470. }
  471. }
  472. }