receiver.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const FastBuffer = Buffer[Symbol.species];
  13. const GET_INFO = 0;
  14. const GET_PAYLOAD_LENGTH_16 = 1;
  15. const GET_PAYLOAD_LENGTH_64 = 2;
  16. const GET_MASK = 3;
  17. const GET_DATA = 4;
  18. const INFLATING = 5;
  19. /**
  20. * HyBi Receiver implementation.
  21. *
  22. * @extends Writable
  23. */
  24. class Receiver extends Writable {
  25. /**
  26. * Creates a Receiver instance.
  27. *
  28. * @param {Object} [options] Options object
  29. * @param {String} [options.binaryType=nodebuffer] The type for binary data
  30. * @param {Object} [options.extensions] An object containing the negotiated
  31. * extensions
  32. * @param {Boolean} [options.isServer=false] Specifies whether to operate in
  33. * client or server mode
  34. * @param {Number} [options.maxPayload=0] The maximum allowed message length
  35. * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
  36. * not to skip UTF-8 validation for text and close messages
  37. */
  38. constructor(options = {}) {
  39. super();
  40. this._binaryType = options.binaryType || BINARY_TYPES[0];
  41. this._extensions = options.extensions || {};
  42. this._isServer = !!options.isServer;
  43. this._maxPayload = options.maxPayload | 0;
  44. this._skipUTF8Validation = !!options.skipUTF8Validation;
  45. this[kWebSocket] = undefined;
  46. this._bufferedBytes = 0;
  47. this._buffers = [];
  48. this._compressed = false;
  49. this._payloadLength = 0;
  50. this._mask = undefined;
  51. this._fragmented = 0;
  52. this._masked = false;
  53. this._fin = false;
  54. this._opcode = 0;
  55. this._totalPayloadLength = 0;
  56. this._messageLength = 0;
  57. this._fragments = [];
  58. this._state = GET_INFO;
  59. this._loop = false;
  60. }
  61. /**
  62. * Implements `Writable.prototype._write()`.
  63. *
  64. * @param {Buffer} chunk The chunk of data to write
  65. * @param {String} encoding The character encoding of `chunk`
  66. * @param {Function} cb Callback
  67. * @private
  68. */
  69. _write(chunk, encoding, cb) {
  70. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  71. this._bufferedBytes += chunk.length;
  72. this._buffers.push(chunk);
  73. this.startLoop(cb);
  74. }
  75. /**
  76. * Consumes `n` bytes from the buffered data.
  77. *
  78. * @param {Number} n The number of bytes to consume
  79. * @return {Buffer} The consumed bytes
  80. * @private
  81. */
  82. consume(n) {
  83. this._bufferedBytes -= n;
  84. if (n === this._buffers[0].length) return this._buffers.shift();
  85. if (n < this._buffers[0].length) {
  86. const buf = this._buffers[0];
  87. this._buffers[0] = new FastBuffer(
  88. buf.buffer,
  89. buf.byteOffset + n,
  90. buf.length - n
  91. );
  92. return new FastBuffer(buf.buffer, buf.byteOffset, n);
  93. }
  94. const dst = Buffer.allocUnsafe(n);
  95. do {
  96. const buf = this._buffers[0];
  97. const offset = dst.length - n;
  98. if (n >= buf.length) {
  99. dst.set(this._buffers.shift(), offset);
  100. } else {
  101. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  102. this._buffers[0] = new FastBuffer(
  103. buf.buffer,
  104. buf.byteOffset + n,
  105. buf.length - n
  106. );
  107. }
  108. n -= buf.length;
  109. } while (n > 0);
  110. return dst;
  111. }
  112. /**
  113. * Starts the parsing loop.
  114. *
  115. * @param {Function} cb Callback
  116. * @private
  117. */
  118. startLoop(cb) {
  119. let err;
  120. this._loop = true;
  121. do {
  122. switch (this._state) {
  123. case GET_INFO:
  124. err = this.getInfo();
  125. break;
  126. case GET_PAYLOAD_LENGTH_16:
  127. err = this.getPayloadLength16();
  128. break;
  129. case GET_PAYLOAD_LENGTH_64:
  130. err = this.getPayloadLength64();
  131. break;
  132. case GET_MASK:
  133. this.getMask();
  134. break;
  135. case GET_DATA:
  136. err = this.getData(cb);
  137. break;
  138. default:
  139. // `INFLATING`
  140. this._loop = false;
  141. return;
  142. }
  143. } while (this._loop);
  144. cb(err);
  145. }
  146. /**
  147. * Reads the first two bytes of a frame.
  148. *
  149. * @return {(RangeError|undefined)} A possible error
  150. * @private
  151. */
  152. getInfo() {
  153. if (this._bufferedBytes < 2) {
  154. this._loop = false;
  155. return;
  156. }
  157. const buf = this.consume(2);
  158. if ((buf[0] & 0x30) !== 0x00) {
  159. this._loop = false;
  160. return error(
  161. RangeError,
  162. 'RSV2 and RSV3 must be clear',
  163. true,
  164. 1002,
  165. 'WS_ERR_UNEXPECTED_RSV_2_3'
  166. );
  167. }
  168. const compressed = (buf[0] & 0x40) === 0x40;
  169. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  170. this._loop = false;
  171. return error(
  172. RangeError,
  173. 'RSV1 must be clear',
  174. true,
  175. 1002,
  176. 'WS_ERR_UNEXPECTED_RSV_1'
  177. );
  178. }
  179. this._fin = (buf[0] & 0x80) === 0x80;
  180. this._opcode = buf[0] & 0x0f;
  181. this._payloadLength = buf[1] & 0x7f;
  182. if (this._opcode === 0x00) {
  183. if (compressed) {
  184. this._loop = false;
  185. return error(
  186. RangeError,
  187. 'RSV1 must be clear',
  188. true,
  189. 1002,
  190. 'WS_ERR_UNEXPECTED_RSV_1'
  191. );
  192. }
  193. if (!this._fragmented) {
  194. this._loop = false;
  195. return error(
  196. RangeError,
  197. 'invalid opcode 0',
  198. true,
  199. 1002,
  200. 'WS_ERR_INVALID_OPCODE'
  201. );
  202. }
  203. this._opcode = this._fragmented;
  204. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  205. if (this._fragmented) {
  206. this._loop = false;
  207. return error(
  208. RangeError,
  209. `invalid opcode ${this._opcode}`,
  210. true,
  211. 1002,
  212. 'WS_ERR_INVALID_OPCODE'
  213. );
  214. }
  215. this._compressed = compressed;
  216. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  217. if (!this._fin) {
  218. this._loop = false;
  219. return error(
  220. RangeError,
  221. 'FIN must be set',
  222. true,
  223. 1002,
  224. 'WS_ERR_EXPECTED_FIN'
  225. );
  226. }
  227. if (compressed) {
  228. this._loop = false;
  229. return error(
  230. RangeError,
  231. 'RSV1 must be clear',
  232. true,
  233. 1002,
  234. 'WS_ERR_UNEXPECTED_RSV_1'
  235. );
  236. }
  237. if (
  238. this._payloadLength > 0x7d ||
  239. (this._opcode === 0x08 && this._payloadLength === 1)
  240. ) {
  241. this._loop = false;
  242. return error(
  243. RangeError,
  244. `invalid payload length ${this._payloadLength}`,
  245. true,
  246. 1002,
  247. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  248. );
  249. }
  250. } else {
  251. this._loop = false;
  252. return error(
  253. RangeError,
  254. `invalid opcode ${this._opcode}`,
  255. true,
  256. 1002,
  257. 'WS_ERR_INVALID_OPCODE'
  258. );
  259. }
  260. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  261. this._masked = (buf[1] & 0x80) === 0x80;
  262. if (this._isServer) {
  263. if (!this._masked) {
  264. this._loop = false;
  265. return error(
  266. RangeError,
  267. 'MASK must be set',
  268. true,
  269. 1002,
  270. 'WS_ERR_EXPECTED_MASK'
  271. );
  272. }
  273. } else if (this._masked) {
  274. this._loop = false;
  275. return error(
  276. RangeError,
  277. 'MASK must be clear',
  278. true,
  279. 1002,
  280. 'WS_ERR_UNEXPECTED_MASK'
  281. );
  282. }
  283. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  284. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  285. else return this.haveLength();
  286. }
  287. /**
  288. * Gets extended payload length (7+16).
  289. *
  290. * @return {(RangeError|undefined)} A possible error
  291. * @private
  292. */
  293. getPayloadLength16() {
  294. if (this._bufferedBytes < 2) {
  295. this._loop = false;
  296. return;
  297. }
  298. this._payloadLength = this.consume(2).readUInt16BE(0);
  299. return this.haveLength();
  300. }
  301. /**
  302. * Gets extended payload length (7+64).
  303. *
  304. * @return {(RangeError|undefined)} A possible error
  305. * @private
  306. */
  307. getPayloadLength64() {
  308. if (this._bufferedBytes < 8) {
  309. this._loop = false;
  310. return;
  311. }
  312. const buf = this.consume(8);
  313. const num = buf.readUInt32BE(0);
  314. //
  315. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  316. // if payload length is greater than this number.
  317. //
  318. if (num > Math.pow(2, 53 - 32) - 1) {
  319. this._loop = false;
  320. return error(
  321. RangeError,
  322. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  323. false,
  324. 1009,
  325. 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
  326. );
  327. }
  328. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  329. return this.haveLength();
  330. }
  331. /**
  332. * Payload length has been read.
  333. *
  334. * @return {(RangeError|undefined)} A possible error
  335. * @private
  336. */
  337. haveLength() {
  338. if (this._payloadLength && this._opcode < 0x08) {
  339. this._totalPayloadLength += this._payloadLength;
  340. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  341. this._loop = false;
  342. return error(
  343. RangeError,
  344. 'Max payload size exceeded',
  345. false,
  346. 1009,
  347. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  348. );
  349. }
  350. }
  351. if (this._masked) this._state = GET_MASK;
  352. else this._state = GET_DATA;
  353. }
  354. /**
  355. * Reads mask bytes.
  356. *
  357. * @private
  358. */
  359. getMask() {
  360. if (this._bufferedBytes < 4) {
  361. this._loop = false;
  362. return;
  363. }
  364. this._mask = this.consume(4);
  365. this._state = GET_DATA;
  366. }
  367. /**
  368. * Reads data bytes.
  369. *
  370. * @param {Function} cb Callback
  371. * @return {(Error|RangeError|undefined)} A possible error
  372. * @private
  373. */
  374. getData(cb) {
  375. let data = EMPTY_BUFFER;
  376. if (this._payloadLength) {
  377. if (this._bufferedBytes < this._payloadLength) {
  378. this._loop = false;
  379. return;
  380. }
  381. data = this.consume(this._payloadLength);
  382. if (
  383. this._masked &&
  384. (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0
  385. ) {
  386. unmask(data, this._mask);
  387. }
  388. }
  389. if (this._opcode > 0x07) return this.controlMessage(data);
  390. if (this._compressed) {
  391. this._state = INFLATING;
  392. this.decompress(data, cb);
  393. return;
  394. }
  395. if (data.length) {
  396. //
  397. // This message is not compressed so its length is the sum of the payload
  398. // length of all fragments.
  399. //
  400. this._messageLength = this._totalPayloadLength;
  401. this._fragments.push(data);
  402. }
  403. return this.dataMessage();
  404. }
  405. /**
  406. * Decompresses data.
  407. *
  408. * @param {Buffer} data Compressed data
  409. * @param {Function} cb Callback
  410. * @private
  411. */
  412. decompress(data, cb) {
  413. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  414. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  415. if (err) return cb(err);
  416. if (buf.length) {
  417. this._messageLength += buf.length;
  418. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  419. return cb(
  420. error(
  421. RangeError,
  422. 'Max payload size exceeded',
  423. false,
  424. 1009,
  425. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  426. )
  427. );
  428. }
  429. this._fragments.push(buf);
  430. }
  431. const er = this.dataMessage();
  432. if (er) return cb(er);
  433. this.startLoop(cb);
  434. });
  435. }
  436. /**
  437. * Handles a data message.
  438. *
  439. * @return {(Error|undefined)} A possible error
  440. * @private
  441. */
  442. dataMessage() {
  443. if (this._fin) {
  444. const messageLength = this._messageLength;
  445. const fragments = this._fragments;
  446. this._totalPayloadLength = 0;
  447. this._messageLength = 0;
  448. this._fragmented = 0;
  449. this._fragments = [];
  450. if (this._opcode === 2) {
  451. let data;
  452. if (this._binaryType === 'nodebuffer') {
  453. data = concat(fragments, messageLength);
  454. } else if (this._binaryType === 'arraybuffer') {
  455. data = toArrayBuffer(concat(fragments, messageLength));
  456. } else {
  457. data = fragments;
  458. }
  459. this.emit('message', data, true);
  460. } else {
  461. const buf = concat(fragments, messageLength);
  462. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  463. this._loop = false;
  464. return error(
  465. Error,
  466. 'invalid UTF-8 sequence',
  467. true,
  468. 1007,
  469. 'WS_ERR_INVALID_UTF8'
  470. );
  471. }
  472. this.emit('message', buf, false);
  473. }
  474. }
  475. this._state = GET_INFO;
  476. }
  477. /**
  478. * Handles a control message.
  479. *
  480. * @param {Buffer} data Data to handle
  481. * @return {(Error|RangeError|undefined)} A possible error
  482. * @private
  483. */
  484. controlMessage(data) {
  485. if (this._opcode === 0x08) {
  486. this._loop = false;
  487. if (data.length === 0) {
  488. this.emit('conclude', 1005, EMPTY_BUFFER);
  489. this.end();
  490. } else {
  491. const code = data.readUInt16BE(0);
  492. if (!isValidStatusCode(code)) {
  493. return error(
  494. RangeError,
  495. `invalid status code ${code}`,
  496. true,
  497. 1002,
  498. 'WS_ERR_INVALID_CLOSE_CODE'
  499. );
  500. }
  501. const buf = new FastBuffer(
  502. data.buffer,
  503. data.byteOffset + 2,
  504. data.length - 2
  505. );
  506. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  507. return error(
  508. Error,
  509. 'invalid UTF-8 sequence',
  510. true,
  511. 1007,
  512. 'WS_ERR_INVALID_UTF8'
  513. );
  514. }
  515. this.emit('conclude', code, buf);
  516. this.end();
  517. }
  518. } else if (this._opcode === 0x09) {
  519. this.emit('ping', data);
  520. } else {
  521. this.emit('pong', data);
  522. }
  523. this._state = GET_INFO;
  524. }
  525. }
  526. module.exports = Receiver;
  527. /**
  528. * Builds an error object.
  529. *
  530. * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
  531. * @param {String} message The error message
  532. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  533. * `message`
  534. * @param {Number} statusCode The status code
  535. * @param {String} errorCode The exposed error code
  536. * @return {(Error|RangeError)} The error
  537. * @private
  538. */
  539. function error(ErrorCtor, message, prefix, statusCode, errorCode) {
  540. const err = new ErrorCtor(
  541. prefix ? `Invalid WebSocket frame: ${message}` : message
  542. );
  543. Error.captureStackTrace(err, error);
  544. err.code = errorCode;
  545. err[kStatusCode] = statusCode;
  546. return err;
  547. }