index.js 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. const { EventEmitter } = require('events')
  2. const STREAM_DESTROYED = new Error('Stream was destroyed')
  3. const PREMATURE_CLOSE = new Error('Premature close')
  4. const queueTick = require('queue-tick')
  5. const FIFO = require('fast-fifo')
  6. /* eslint-disable no-multi-spaces */
  7. // 28 bits used total (4 from shared, 14 from read, and 10 from write)
  8. const MAX = ((1 << 28) - 1)
  9. // Shared state
  10. const OPENING = 0b0001
  11. const PREDESTROYING = 0b0010
  12. const DESTROYING = 0b0100
  13. const DESTROYED = 0b1000
  14. const NOT_OPENING = MAX ^ OPENING
  15. const NOT_PREDESTROYING = MAX ^ PREDESTROYING
  16. // Read state (4 bit offset from shared state)
  17. const READ_ACTIVE = 0b00000000000001 << 4
  18. const READ_UPDATING = 0b00000000000010 << 4
  19. const READ_PRIMARY = 0b00000000000100 << 4
  20. const READ_QUEUED = 0b00000000001000 << 4
  21. const READ_RESUMED = 0b00000000010000 << 4
  22. const READ_PIPE_DRAINED = 0b00000000100000 << 4
  23. const READ_ENDING = 0b00000001000000 << 4
  24. const READ_EMIT_DATA = 0b00000010000000 << 4
  25. const READ_EMIT_READABLE = 0b00000100000000 << 4
  26. const READ_EMITTED_READABLE = 0b00001000000000 << 4
  27. const READ_DONE = 0b00010000000000 << 4
  28. const READ_NEXT_TICK = 0b00100000000000 << 4
  29. const READ_NEEDS_PUSH = 0b01000000000000 << 4
  30. const READ_READ_AHEAD = 0b10000000000000 << 4
  31. // Combined read state
  32. const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
  33. const READ_ACTIVE_AND_NEEDS_PUSH = READ_ACTIVE | READ_NEEDS_PUSH
  34. const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
  35. const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
  36. const READ_RESUMED_READ_AHEAD = READ_RESUMED | READ_READ_AHEAD
  37. const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
  38. const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
  39. const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
  40. const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
  41. const READ_PAUSED = MAX ^ READ_RESUMED
  42. const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
  43. const READ_NOT_ENDING = MAX ^ READ_ENDING
  44. const READ_PIPE_NOT_DRAINED = MAX ^ READ_FLOWING
  45. const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
  46. const READ_NOT_UPDATING = MAX ^ READ_UPDATING
  47. const READ_NO_READ_AHEAD = MAX ^ READ_READ_AHEAD
  48. const READ_PAUSED_NO_READ_AHEAD = MAX ^ READ_RESUMED_READ_AHEAD
  49. // Write state (18 bit offset, 4 bit offset from shared state and 13 from read state)
  50. const WRITE_ACTIVE = 0b0000000001 << 18
  51. const WRITE_UPDATING = 0b0000000010 << 18
  52. const WRITE_PRIMARY = 0b0000000100 << 18
  53. const WRITE_QUEUED = 0b0000001000 << 18
  54. const WRITE_UNDRAINED = 0b0000010000 << 18
  55. const WRITE_DONE = 0b0000100000 << 18
  56. const WRITE_EMIT_DRAIN = 0b0001000000 << 18
  57. const WRITE_NEXT_TICK = 0b0010000000 << 18
  58. const WRITE_WRITING = 0b0100000000 << 18
  59. const WRITE_FINISHING = 0b1000000000 << 18
  60. const WRITE_NOT_ACTIVE = MAX ^ (WRITE_ACTIVE | WRITE_WRITING)
  61. const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
  62. const WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHING
  63. const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
  64. const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
  65. const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
  66. const WRITE_NOT_UPDATING = MAX ^ WRITE_UPDATING
  67. // Combined shared state
  68. const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
  69. const NOT_ACTIVE = MAX ^ ACTIVE
  70. const DONE = READ_DONE | WRITE_DONE
  71. const DESTROY_STATUS = DESTROYING | DESTROYED | PREDESTROYING
  72. const OPEN_STATUS = DESTROY_STATUS | OPENING
  73. const AUTO_DESTROY = DESTROY_STATUS | DONE
  74. const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
  75. const ACTIVE_OR_TICKING = WRITE_NEXT_TICK | READ_NEXT_TICK
  76. const TICKING = ACTIVE_OR_TICKING & NOT_ACTIVE
  77. const IS_OPENING = OPEN_STATUS | TICKING
  78. // Combined shared state and read state
  79. const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
  80. const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
  81. const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
  82. const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
  83. const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH | READ_READ_AHEAD
  84. const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
  85. const READ_UPDATE_SYNC_STATUS = READ_UPDATING | OPEN_STATUS | READ_NEXT_TICK | READ_PRIMARY
  86. // Combined write state
  87. const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
  88. const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
  89. const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
  90. const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
  91. const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED
  92. const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
  93. const WRITE_ACTIVE_AND_WRITING = WRITE_ACTIVE | WRITE_WRITING
  94. const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
  95. const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
  96. const WRITE_UPDATE_SYNC_STATUS = WRITE_UPDATING | OPEN_STATUS | WRITE_NEXT_TICK | WRITE_PRIMARY
  97. const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
  98. class WritableState {
  99. constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
  100. this.stream = stream
  101. this.queue = new FIFO()
  102. this.highWaterMark = highWaterMark
  103. this.buffered = 0
  104. this.error = null
  105. this.pipeline = null
  106. this.drains = null // if we add more seldomly used helpers we might them into a subobject so its a single ptr
  107. this.byteLength = byteLengthWritable || byteLength || defaultByteLength
  108. this.map = mapWritable || map
  109. this.afterWrite = afterWrite.bind(this)
  110. this.afterUpdateNextTick = updateWriteNT.bind(this)
  111. }
  112. get ended () {
  113. return (this.stream._duplexState & WRITE_DONE) !== 0
  114. }
  115. push (data) {
  116. if (this.map !== null) data = this.map(data)
  117. this.buffered += this.byteLength(data)
  118. this.queue.push(data)
  119. if (this.buffered < this.highWaterMark) {
  120. this.stream._duplexState |= WRITE_QUEUED
  121. return true
  122. }
  123. this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
  124. return false
  125. }
  126. shift () {
  127. const data = this.queue.shift()
  128. this.buffered -= this.byteLength(data)
  129. if (this.buffered === 0) this.stream._duplexState &= WRITE_NOT_QUEUED
  130. return data
  131. }
  132. end (data) {
  133. if (typeof data === 'function') this.stream.once('finish', data)
  134. else if (data !== undefined && data !== null) this.push(data)
  135. this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
  136. }
  137. autoBatch (data, cb) {
  138. const buffer = []
  139. const stream = this.stream
  140. buffer.push(data)
  141. while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
  142. buffer.push(stream._writableState.shift())
  143. }
  144. if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
  145. stream._writev(buffer, cb)
  146. }
  147. update () {
  148. const stream = this.stream
  149. stream._duplexState |= WRITE_UPDATING
  150. do {
  151. while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
  152. const data = this.shift()
  153. stream._duplexState |= WRITE_ACTIVE_AND_WRITING
  154. stream._write(data, this.afterWrite)
  155. }
  156. if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
  157. } while (this.continueUpdate() === true)
  158. stream._duplexState &= WRITE_NOT_UPDATING
  159. }
  160. updateNonPrimary () {
  161. const stream = this.stream
  162. if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
  163. stream._duplexState = (stream._duplexState | WRITE_ACTIVE) & WRITE_NOT_FINISHING
  164. stream._final(afterFinal.bind(this))
  165. return
  166. }
  167. if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
  168. if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
  169. stream._duplexState |= ACTIVE
  170. stream._destroy(afterDestroy.bind(this))
  171. }
  172. return
  173. }
  174. if ((stream._duplexState & IS_OPENING) === OPENING) {
  175. stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
  176. stream._open(afterOpen.bind(this))
  177. }
  178. }
  179. continueUpdate () {
  180. if ((this.stream._duplexState & WRITE_NEXT_TICK) === 0) return false
  181. this.stream._duplexState &= WRITE_NOT_NEXT_TICK
  182. return true
  183. }
  184. updateCallback () {
  185. if ((this.stream._duplexState & WRITE_UPDATE_SYNC_STATUS) === WRITE_PRIMARY) this.update()
  186. else this.updateNextTick()
  187. }
  188. updateNextTick () {
  189. if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
  190. this.stream._duplexState |= WRITE_NEXT_TICK
  191. if ((this.stream._duplexState & WRITE_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
  192. }
  193. }
  194. class ReadableState {
  195. constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
  196. this.stream = stream
  197. this.queue = new FIFO()
  198. this.highWaterMark = highWaterMark === 0 ? 1 : highWaterMark
  199. this.buffered = 0
  200. this.readAhead = highWaterMark > 0
  201. this.error = null
  202. this.pipeline = null
  203. this.byteLength = byteLengthReadable || byteLength || defaultByteLength
  204. this.map = mapReadable || map
  205. this.pipeTo = null
  206. this.afterRead = afterRead.bind(this)
  207. this.afterUpdateNextTick = updateReadNT.bind(this)
  208. }
  209. get ended () {
  210. return (this.stream._duplexState & READ_DONE) !== 0
  211. }
  212. pipe (pipeTo, cb) {
  213. if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
  214. if (typeof cb !== 'function') cb = null
  215. this.stream._duplexState |= READ_PIPE_DRAINED
  216. this.pipeTo = pipeTo
  217. this.pipeline = new Pipeline(this.stream, pipeTo, cb)
  218. if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
  219. if (isStreamx(pipeTo)) {
  220. pipeTo._writableState.pipeline = this.pipeline
  221. if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
  222. pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
  223. } else {
  224. const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
  225. const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
  226. pipeTo.on('error', onerror)
  227. pipeTo.on('close', onclose)
  228. pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
  229. }
  230. pipeTo.on('drain', afterDrain.bind(this))
  231. this.stream.emit('piping', pipeTo)
  232. pipeTo.emit('pipe', this.stream)
  233. }
  234. push (data) {
  235. const stream = this.stream
  236. if (data === null) {
  237. this.highWaterMark = 0
  238. stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
  239. return false
  240. }
  241. if (this.map !== null) data = this.map(data)
  242. this.buffered += this.byteLength(data)
  243. this.queue.push(data)
  244. stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
  245. return this.buffered < this.highWaterMark
  246. }
  247. shift () {
  248. const data = this.queue.shift()
  249. this.buffered -= this.byteLength(data)
  250. if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
  251. return data
  252. }
  253. unshift (data) {
  254. const pending = [this.map !== null ? this.map(data) : data]
  255. while (this.buffered > 0) pending.push(this.shift())
  256. for (let i = 0; i < pending.length - 1; i++) {
  257. const data = pending[i]
  258. this.buffered += this.byteLength(data)
  259. this.queue.push(data)
  260. }
  261. this.push(pending[pending.length - 1])
  262. }
  263. read () {
  264. const stream = this.stream
  265. if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
  266. const data = this.shift()
  267. if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
  268. if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
  269. return data
  270. }
  271. if (this.readAhead === false) {
  272. stream._duplexState |= READ_READ_AHEAD
  273. this.updateNextTick()
  274. }
  275. return null
  276. }
  277. drain () {
  278. const stream = this.stream
  279. while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
  280. const data = this.shift()
  281. if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
  282. if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
  283. }
  284. }
  285. update () {
  286. const stream = this.stream
  287. stream._duplexState |= READ_UPDATING
  288. do {
  289. this.drain()
  290. while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === READ_READ_AHEAD) {
  291. stream._duplexState |= READ_ACTIVE_AND_NEEDS_PUSH
  292. stream._read(this.afterRead)
  293. this.drain()
  294. }
  295. if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
  296. stream._duplexState |= READ_EMITTED_READABLE
  297. stream.emit('readable')
  298. }
  299. if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
  300. } while (this.continueUpdate() === true)
  301. stream._duplexState &= READ_NOT_UPDATING
  302. }
  303. updateNonPrimary () {
  304. const stream = this.stream
  305. if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
  306. stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
  307. stream.emit('end')
  308. if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
  309. if (this.pipeTo !== null) this.pipeTo.end()
  310. }
  311. if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
  312. if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
  313. stream._duplexState |= ACTIVE
  314. stream._destroy(afterDestroy.bind(this))
  315. }
  316. return
  317. }
  318. if ((stream._duplexState & IS_OPENING) === OPENING) {
  319. stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
  320. stream._open(afterOpen.bind(this))
  321. }
  322. }
  323. continueUpdate () {
  324. if ((this.stream._duplexState & READ_NEXT_TICK) === 0) return false
  325. this.stream._duplexState &= READ_NOT_NEXT_TICK
  326. return true
  327. }
  328. updateCallback () {
  329. if ((this.stream._duplexState & READ_UPDATE_SYNC_STATUS) === READ_PRIMARY) this.update()
  330. else this.updateNextTick()
  331. }
  332. updateNextTick () {
  333. if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
  334. this.stream._duplexState |= READ_NEXT_TICK
  335. if ((this.stream._duplexState & READ_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
  336. }
  337. }
  338. class TransformState {
  339. constructor (stream) {
  340. this.data = null
  341. this.afterTransform = afterTransform.bind(stream)
  342. this.afterFinal = null
  343. }
  344. }
  345. class Pipeline {
  346. constructor (src, dst, cb) {
  347. this.from = src
  348. this.to = dst
  349. this.afterPipe = cb
  350. this.error = null
  351. this.pipeToFinished = false
  352. }
  353. finished () {
  354. this.pipeToFinished = true
  355. }
  356. done (stream, err) {
  357. if (err) this.error = err
  358. if (stream === this.to) {
  359. this.to = null
  360. if (this.from !== null) {
  361. if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
  362. this.from.destroy(this.error || new Error('Writable stream closed prematurely'))
  363. }
  364. return
  365. }
  366. }
  367. if (stream === this.from) {
  368. this.from = null
  369. if (this.to !== null) {
  370. if ((stream._duplexState & READ_DONE) === 0) {
  371. this.to.destroy(this.error || new Error('Readable stream closed before ending'))
  372. }
  373. return
  374. }
  375. }
  376. if (this.afterPipe !== null) this.afterPipe(this.error)
  377. this.to = this.from = this.afterPipe = null
  378. }
  379. }
  380. function afterDrain () {
  381. this.stream._duplexState |= READ_PIPE_DRAINED
  382. this.updateCallback()
  383. }
  384. function afterFinal (err) {
  385. const stream = this.stream
  386. if (err) stream.destroy(err)
  387. if ((stream._duplexState & DESTROY_STATUS) === 0) {
  388. stream._duplexState |= WRITE_DONE
  389. stream.emit('finish')
  390. }
  391. if ((stream._duplexState & AUTO_DESTROY) === DONE) {
  392. stream._duplexState |= DESTROYING
  393. }
  394. stream._duplexState &= WRITE_NOT_ACTIVE
  395. // no need to wait the extra tick here, so we short circuit that
  396. if ((stream._duplexState & WRITE_UPDATING) === 0) this.update()
  397. else this.updateNextTick()
  398. }
  399. function afterDestroy (err) {
  400. const stream = this.stream
  401. if (!err && this.error !== STREAM_DESTROYED) err = this.error
  402. if (err) stream.emit('error', err)
  403. stream._duplexState |= DESTROYED
  404. stream.emit('close')
  405. const rs = stream._readableState
  406. const ws = stream._writableState
  407. if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
  408. if (ws !== null) {
  409. while (ws.drains !== null && ws.drains.length > 0) ws.drains.shift().resolve(false)
  410. if (ws.pipeline !== null) ws.pipeline.done(stream, err)
  411. }
  412. }
  413. function afterWrite (err) {
  414. const stream = this.stream
  415. if (err) stream.destroy(err)
  416. stream._duplexState &= WRITE_NOT_ACTIVE
  417. if (this.drains !== null) tickDrains(this.drains)
  418. if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
  419. stream._duplexState &= WRITE_DRAINED
  420. if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
  421. stream.emit('drain')
  422. }
  423. }
  424. this.updateCallback()
  425. }
  426. function afterRead (err) {
  427. if (err) this.stream.destroy(err)
  428. this.stream._duplexState &= READ_NOT_ACTIVE
  429. if (this.readAhead === false && (this.stream._duplexState & READ_RESUMED) === 0) this.stream._duplexState &= READ_NO_READ_AHEAD
  430. this.updateCallback()
  431. }
  432. function updateReadNT () {
  433. if ((this.stream._duplexState & READ_UPDATING) === 0) {
  434. this.stream._duplexState &= READ_NOT_NEXT_TICK
  435. this.update()
  436. }
  437. }
  438. function updateWriteNT () {
  439. if ((this.stream._duplexState & WRITE_UPDATING) === 0) {
  440. this.stream._duplexState &= WRITE_NOT_NEXT_TICK
  441. this.update()
  442. }
  443. }
  444. function tickDrains (drains) {
  445. for (let i = 0; i < drains.length; i++) {
  446. // drains.writes are monotonic, so if one is 0 its always the first one
  447. if (--drains[i].writes === 0) {
  448. drains.shift().resolve(true)
  449. i--
  450. }
  451. }
  452. }
  453. function afterOpen (err) {
  454. const stream = this.stream
  455. if (err) stream.destroy(err)
  456. if ((stream._duplexState & DESTROYING) === 0) {
  457. if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
  458. if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
  459. stream.emit('open')
  460. }
  461. stream._duplexState &= NOT_ACTIVE
  462. if (stream._writableState !== null) {
  463. stream._writableState.updateCallback()
  464. }
  465. if (stream._readableState !== null) {
  466. stream._readableState.updateCallback()
  467. }
  468. }
  469. function afterTransform (err, data) {
  470. if (data !== undefined && data !== null) this.push(data)
  471. this._writableState.afterWrite(err)
  472. }
  473. function newListener (name) {
  474. if (this._readableState !== null) {
  475. if (name === 'data') {
  476. this._duplexState |= (READ_EMIT_DATA | READ_RESUMED_READ_AHEAD)
  477. this._readableState.updateNextTick()
  478. }
  479. if (name === 'readable') {
  480. this._duplexState |= READ_EMIT_READABLE
  481. this._readableState.updateNextTick()
  482. }
  483. }
  484. if (this._writableState !== null) {
  485. if (name === 'drain') {
  486. this._duplexState |= WRITE_EMIT_DRAIN
  487. this._writableState.updateNextTick()
  488. }
  489. }
  490. }
  491. class Stream extends EventEmitter {
  492. constructor (opts) {
  493. super()
  494. this._duplexState = 0
  495. this._readableState = null
  496. this._writableState = null
  497. if (opts) {
  498. if (opts.open) this._open = opts.open
  499. if (opts.destroy) this._destroy = opts.destroy
  500. if (opts.predestroy) this._predestroy = opts.predestroy
  501. if (opts.signal) {
  502. opts.signal.addEventListener('abort', abort.bind(this))
  503. }
  504. }
  505. this.on('newListener', newListener)
  506. }
  507. _open (cb) {
  508. cb(null)
  509. }
  510. _destroy (cb) {
  511. cb(null)
  512. }
  513. _predestroy () {
  514. // does nothing
  515. }
  516. get readable () {
  517. return this._readableState !== null ? true : undefined
  518. }
  519. get writable () {
  520. return this._writableState !== null ? true : undefined
  521. }
  522. get destroyed () {
  523. return (this._duplexState & DESTROYED) !== 0
  524. }
  525. get destroying () {
  526. return (this._duplexState & DESTROY_STATUS) !== 0
  527. }
  528. destroy (err) {
  529. if ((this._duplexState & DESTROY_STATUS) === 0) {
  530. if (!err) err = STREAM_DESTROYED
  531. this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
  532. if (this._readableState !== null) {
  533. this._readableState.highWaterMark = 0
  534. this._readableState.error = err
  535. }
  536. if (this._writableState !== null) {
  537. this._writableState.highWaterMark = 0
  538. this._writableState.error = err
  539. }
  540. this._duplexState |= PREDESTROYING
  541. this._predestroy()
  542. this._duplexState &= NOT_PREDESTROYING
  543. if (this._readableState !== null) this._readableState.updateNextTick()
  544. if (this._writableState !== null) this._writableState.updateNextTick()
  545. }
  546. }
  547. }
  548. class Readable extends Stream {
  549. constructor (opts) {
  550. super(opts)
  551. this._duplexState |= OPENING | WRITE_DONE | READ_READ_AHEAD
  552. this._readableState = new ReadableState(this, opts)
  553. if (opts) {
  554. if (this._readableState.readAhead === false) this._duplexState &= READ_NO_READ_AHEAD
  555. if (opts.read) this._read = opts.read
  556. if (opts.eagerOpen) this._readableState.updateNextTick()
  557. }
  558. }
  559. _read (cb) {
  560. cb(null)
  561. }
  562. pipe (dest, cb) {
  563. this._readableState.updateNextTick()
  564. this._readableState.pipe(dest, cb)
  565. return dest
  566. }
  567. read () {
  568. this._readableState.updateNextTick()
  569. return this._readableState.read()
  570. }
  571. push (data) {
  572. this._readableState.updateNextTick()
  573. return this._readableState.push(data)
  574. }
  575. unshift (data) {
  576. this._readableState.updateNextTick()
  577. return this._readableState.unshift(data)
  578. }
  579. resume () {
  580. this._duplexState |= READ_RESUMED_READ_AHEAD
  581. this._readableState.updateNextTick()
  582. return this
  583. }
  584. pause () {
  585. this._duplexState &= (this._readableState.readAhead === false ? READ_PAUSED_NO_READ_AHEAD : READ_PAUSED)
  586. return this
  587. }
  588. static _fromAsyncIterator (ite, opts) {
  589. let destroy
  590. const rs = new Readable({
  591. ...opts,
  592. read (cb) {
  593. ite.next().then(push).then(cb.bind(null, null)).catch(cb)
  594. },
  595. predestroy () {
  596. destroy = ite.return()
  597. },
  598. destroy (cb) {
  599. if (!destroy) return cb(null)
  600. destroy.then(cb.bind(null, null)).catch(cb)
  601. }
  602. })
  603. return rs
  604. function push (data) {
  605. if (data.done) rs.push(null)
  606. else rs.push(data.value)
  607. }
  608. }
  609. static from (data, opts) {
  610. if (isReadStreamx(data)) return data
  611. if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)
  612. if (!Array.isArray(data)) data = data === undefined ? [] : [data]
  613. let i = 0
  614. return new Readable({
  615. ...opts,
  616. read (cb) {
  617. this.push(i === data.length ? null : data[i++])
  618. cb(null)
  619. }
  620. })
  621. }
  622. static isBackpressured (rs) {
  623. return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
  624. }
  625. static isPaused (rs) {
  626. return (rs._duplexState & READ_RESUMED) === 0
  627. }
  628. [asyncIterator] () {
  629. const stream = this
  630. let error = null
  631. let promiseResolve = null
  632. let promiseReject = null
  633. this.on('error', (err) => { error = err })
  634. this.on('readable', onreadable)
  635. this.on('close', onclose)
  636. return {
  637. [asyncIterator] () {
  638. return this
  639. },
  640. next () {
  641. return new Promise(function (resolve, reject) {
  642. promiseResolve = resolve
  643. promiseReject = reject
  644. const data = stream.read()
  645. if (data !== null) ondata(data)
  646. else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
  647. })
  648. },
  649. return () {
  650. return destroy(null)
  651. },
  652. throw (err) {
  653. return destroy(err)
  654. }
  655. }
  656. function onreadable () {
  657. if (promiseResolve !== null) ondata(stream.read())
  658. }
  659. function onclose () {
  660. if (promiseResolve !== null) ondata(null)
  661. }
  662. function ondata (data) {
  663. if (promiseReject === null) return
  664. if (error) promiseReject(error)
  665. else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
  666. else promiseResolve({ value: data, done: data === null })
  667. promiseReject = promiseResolve = null
  668. }
  669. function destroy (err) {
  670. stream.destroy(err)
  671. return new Promise((resolve, reject) => {
  672. if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })
  673. stream.once('close', function () {
  674. if (err) reject(err)
  675. else resolve({ value: undefined, done: true })
  676. })
  677. })
  678. }
  679. }
  680. }
  681. class Writable extends Stream {
  682. constructor (opts) {
  683. super(opts)
  684. this._duplexState |= OPENING | READ_DONE
  685. this._writableState = new WritableState(this, opts)
  686. if (opts) {
  687. if (opts.writev) this._writev = opts.writev
  688. if (opts.write) this._write = opts.write
  689. if (opts.final) this._final = opts.final
  690. if (opts.eagerOpen) this._writableState.updateNextTick()
  691. }
  692. }
  693. _writev (batch, cb) {
  694. cb(null)
  695. }
  696. _write (data, cb) {
  697. this._writableState.autoBatch(data, cb)
  698. }
  699. _final (cb) {
  700. cb(null)
  701. }
  702. static isBackpressured (ws) {
  703. return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
  704. }
  705. static drained (ws) {
  706. if (ws.destroyed) return Promise.resolve(false)
  707. const state = ws._writableState
  708. const pending = (isWritev(ws) ? Math.min(1, state.queue.length) : state.queue.length)
  709. const writes = pending + ((ws._duplexState & WRITE_WRITING) ? 1 : 0)
  710. if (writes === 0) return Promise.resolve(true)
  711. if (state.drains === null) state.drains = []
  712. return new Promise((resolve) => {
  713. state.drains.push({ writes, resolve })
  714. })
  715. }
  716. write (data) {
  717. this._writableState.updateNextTick()
  718. return this._writableState.push(data)
  719. }
  720. end (data) {
  721. this._writableState.updateNextTick()
  722. this._writableState.end(data)
  723. return this
  724. }
  725. }
  726. class Duplex extends Readable { // and Writable
  727. constructor (opts) {
  728. super(opts)
  729. this._duplexState = OPENING | (this._duplexState & READ_READ_AHEAD)
  730. this._writableState = new WritableState(this, opts)
  731. if (opts) {
  732. if (opts.writev) this._writev = opts.writev
  733. if (opts.write) this._write = opts.write
  734. if (opts.final) this._final = opts.final
  735. }
  736. }
  737. _writev (batch, cb) {
  738. cb(null)
  739. }
  740. _write (data, cb) {
  741. this._writableState.autoBatch(data, cb)
  742. }
  743. _final (cb) {
  744. cb(null)
  745. }
  746. write (data) {
  747. this._writableState.updateNextTick()
  748. return this._writableState.push(data)
  749. }
  750. end (data) {
  751. this._writableState.updateNextTick()
  752. this._writableState.end(data)
  753. return this
  754. }
  755. }
  756. class Transform extends Duplex {
  757. constructor (opts) {
  758. super(opts)
  759. this._transformState = new TransformState(this)
  760. if (opts) {
  761. if (opts.transform) this._transform = opts.transform
  762. if (opts.flush) this._flush = opts.flush
  763. }
  764. }
  765. _write (data, cb) {
  766. if (this._readableState.buffered >= this._readableState.highWaterMark) {
  767. this._transformState.data = data
  768. } else {
  769. this._transform(data, this._transformState.afterTransform)
  770. }
  771. }
  772. _read (cb) {
  773. if (this._transformState.data !== null) {
  774. const data = this._transformState.data
  775. this._transformState.data = null
  776. cb(null)
  777. this._transform(data, this._transformState.afterTransform)
  778. } else {
  779. cb(null)
  780. }
  781. }
  782. destroy (err) {
  783. super.destroy(err)
  784. if (this._transformState.data !== null) {
  785. this._transformState.data = null
  786. this._transformState.afterTransform()
  787. }
  788. }
  789. _transform (data, cb) {
  790. cb(null, data)
  791. }
  792. _flush (cb) {
  793. cb(null)
  794. }
  795. _final (cb) {
  796. this._transformState.afterFinal = cb
  797. this._flush(transformAfterFlush.bind(this))
  798. }
  799. }
  800. class PassThrough extends Transform {}
  801. function transformAfterFlush (err, data) {
  802. const cb = this._transformState.afterFinal
  803. if (err) return cb(err)
  804. if (data !== null && data !== undefined) this.push(data)
  805. this.push(null)
  806. cb(null)
  807. }
  808. function pipelinePromise (...streams) {
  809. return new Promise((resolve, reject) => {
  810. return pipeline(...streams, (err) => {
  811. if (err) return reject(err)
  812. resolve()
  813. })
  814. })
  815. }
  816. function pipeline (stream, ...streams) {
  817. const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]
  818. const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null
  819. if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')
  820. let src = all[0]
  821. let dest = null
  822. let error = null
  823. for (let i = 1; i < all.length; i++) {
  824. dest = all[i]
  825. if (isStreamx(src)) {
  826. src.pipe(dest, onerror)
  827. } else {
  828. errorHandle(src, true, i > 1, onerror)
  829. src.pipe(dest)
  830. }
  831. src = dest
  832. }
  833. if (done) {
  834. let fin = false
  835. const autoDestroy = isStreamx(dest) || !!(dest._writableState && dest._writableState.autoDestroy)
  836. dest.on('error', (err) => {
  837. if (error === null) error = err
  838. })
  839. dest.on('finish', () => {
  840. fin = true
  841. if (!autoDestroy) done(error)
  842. })
  843. if (autoDestroy) {
  844. dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))
  845. }
  846. }
  847. return dest
  848. function errorHandle (s, rd, wr, onerror) {
  849. s.on('error', onerror)
  850. s.on('close', onclose)
  851. function onclose () {
  852. if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)
  853. if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)
  854. }
  855. }
  856. function onerror (err) {
  857. if (!err || error) return
  858. error = err
  859. for (const s of all) {
  860. s.destroy(err)
  861. }
  862. }
  863. }
  864. function isStream (stream) {
  865. return !!stream._readableState || !!stream._writableState
  866. }
  867. function isStreamx (stream) {
  868. return typeof stream._duplexState === 'number' && isStream(stream)
  869. }
  870. function getStreamError (stream) {
  871. const err = (stream._readableState && stream._readableState.error) || (stream._writableState && stream._writableState.error)
  872. return err === STREAM_DESTROYED ? null : err // only explicit errors
  873. }
  874. function isReadStreamx (stream) {
  875. return isStreamx(stream) && stream.readable
  876. }
  877. function isTypedArray (data) {
  878. return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
  879. }
  880. function defaultByteLength (data) {
  881. return isTypedArray(data) ? data.byteLength : 1024
  882. }
  883. function noop () {}
  884. function abort () {
  885. this.destroy(new Error('Stream aborted.'))
  886. }
  887. function isWritev (s) {
  888. return s._writev !== Writable.prototype._writev && s._writev !== Duplex.prototype._writev
  889. }
  890. module.exports = {
  891. pipeline,
  892. pipelinePromise,
  893. isStream,
  894. isStreamx,
  895. getStreamError,
  896. Stream,
  897. Writable,
  898. Readable,
  899. Duplex,
  900. Transform,
  901. // Export PassThrough for compatibility with Node.js core's stream module
  902. PassThrough
  903. }