1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111 |
- const { EventEmitter } = require('events')
- const STREAM_DESTROYED = new Error('Stream was destroyed')
- const PREMATURE_CLOSE = new Error('Premature close')
- const queueTick = require('queue-tick')
- const FIFO = require('fast-fifo')
- /* eslint-disable no-multi-spaces */
- // 28 bits used total (4 from shared, 14 from read, and 10 from write)
- const MAX = ((1 << 28) - 1)
- // Shared state
- const OPENING = 0b0001
- const PREDESTROYING = 0b0010
- const DESTROYING = 0b0100
- const DESTROYED = 0b1000
- const NOT_OPENING = MAX ^ OPENING
- const NOT_PREDESTROYING = MAX ^ PREDESTROYING
- // Read state (4 bit offset from shared state)
- const READ_ACTIVE = 0b00000000000001 << 4
- const READ_UPDATING = 0b00000000000010 << 4
- const READ_PRIMARY = 0b00000000000100 << 4
- const READ_QUEUED = 0b00000000001000 << 4
- const READ_RESUMED = 0b00000000010000 << 4
- const READ_PIPE_DRAINED = 0b00000000100000 << 4
- const READ_ENDING = 0b00000001000000 << 4
- const READ_EMIT_DATA = 0b00000010000000 << 4
- const READ_EMIT_READABLE = 0b00000100000000 << 4
- const READ_EMITTED_READABLE = 0b00001000000000 << 4
- const READ_DONE = 0b00010000000000 << 4
- const READ_NEXT_TICK = 0b00100000000000 << 4
- const READ_NEEDS_PUSH = 0b01000000000000 << 4
- const READ_READ_AHEAD = 0b10000000000000 << 4
- // Combined read state
- const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
- const READ_ACTIVE_AND_NEEDS_PUSH = READ_ACTIVE | READ_NEEDS_PUSH
- const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
- const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
- const READ_RESUMED_READ_AHEAD = READ_RESUMED | READ_READ_AHEAD
- const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
- const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
- const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
- const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
- const READ_PAUSED = MAX ^ READ_RESUMED
- const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
- const READ_NOT_ENDING = MAX ^ READ_ENDING
- const READ_PIPE_NOT_DRAINED = MAX ^ READ_FLOWING
- const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
- const READ_NOT_UPDATING = MAX ^ READ_UPDATING
- const READ_NO_READ_AHEAD = MAX ^ READ_READ_AHEAD
- const READ_PAUSED_NO_READ_AHEAD = MAX ^ READ_RESUMED_READ_AHEAD
- // Write state (18 bit offset, 4 bit offset from shared state and 13 from read state)
- const WRITE_ACTIVE = 0b0000000001 << 18
- const WRITE_UPDATING = 0b0000000010 << 18
- const WRITE_PRIMARY = 0b0000000100 << 18
- const WRITE_QUEUED = 0b0000001000 << 18
- const WRITE_UNDRAINED = 0b0000010000 << 18
- const WRITE_DONE = 0b0000100000 << 18
- const WRITE_EMIT_DRAIN = 0b0001000000 << 18
- const WRITE_NEXT_TICK = 0b0010000000 << 18
- const WRITE_WRITING = 0b0100000000 << 18
- const WRITE_FINISHING = 0b1000000000 << 18
- const WRITE_NOT_ACTIVE = MAX ^ (WRITE_ACTIVE | WRITE_WRITING)
- const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
- const WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHING
- const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
- const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
- const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
- const WRITE_NOT_UPDATING = MAX ^ WRITE_UPDATING
- // Combined shared state
- const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
- const NOT_ACTIVE = MAX ^ ACTIVE
- const DONE = READ_DONE | WRITE_DONE
- const DESTROY_STATUS = DESTROYING | DESTROYED | PREDESTROYING
- const OPEN_STATUS = DESTROY_STATUS | OPENING
- const AUTO_DESTROY = DESTROY_STATUS | DONE
- const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
- const ACTIVE_OR_TICKING = WRITE_NEXT_TICK | READ_NEXT_TICK
- const TICKING = ACTIVE_OR_TICKING & NOT_ACTIVE
- const IS_OPENING = OPEN_STATUS | TICKING
- // Combined shared state and read state
- const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
- const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
- const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
- const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
- const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH | READ_READ_AHEAD
- const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
- const READ_UPDATE_SYNC_STATUS = READ_UPDATING | OPEN_STATUS | READ_NEXT_TICK | READ_PRIMARY
- // Combined write state
- const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
- const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
- const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
- const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
- const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED
- const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
- const WRITE_ACTIVE_AND_WRITING = WRITE_ACTIVE | WRITE_WRITING
- const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
- const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
- const WRITE_UPDATE_SYNC_STATUS = WRITE_UPDATING | OPEN_STATUS | WRITE_NEXT_TICK | WRITE_PRIMARY
- const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
- class WritableState {
- constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
- this.stream = stream
- this.queue = new FIFO()
- this.highWaterMark = highWaterMark
- this.buffered = 0
- this.error = null
- this.pipeline = null
- this.drains = null // if we add more seldomly used helpers we might them into a subobject so its a single ptr
- this.byteLength = byteLengthWritable || byteLength || defaultByteLength
- this.map = mapWritable || map
- this.afterWrite = afterWrite.bind(this)
- this.afterUpdateNextTick = updateWriteNT.bind(this)
- }
- get ended () {
- return (this.stream._duplexState & WRITE_DONE) !== 0
- }
- push (data) {
- if (this.map !== null) data = this.map(data)
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- if (this.buffered < this.highWaterMark) {
- this.stream._duplexState |= WRITE_QUEUED
- return true
- }
- this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
- return false
- }
- shift () {
- const data = this.queue.shift()
- this.buffered -= this.byteLength(data)
- if (this.buffered === 0) this.stream._duplexState &= WRITE_NOT_QUEUED
- return data
- }
- end (data) {
- if (typeof data === 'function') this.stream.once('finish', data)
- else if (data !== undefined && data !== null) this.push(data)
- this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
- }
- autoBatch (data, cb) {
- const buffer = []
- const stream = this.stream
- buffer.push(data)
- while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
- buffer.push(stream._writableState.shift())
- }
- if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
- stream._writev(buffer, cb)
- }
- update () {
- const stream = this.stream
- stream._duplexState |= WRITE_UPDATING
- do {
- while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
- const data = this.shift()
- stream._duplexState |= WRITE_ACTIVE_AND_WRITING
- stream._write(data, this.afterWrite)
- }
- if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
- } while (this.continueUpdate() === true)
- stream._duplexState &= WRITE_NOT_UPDATING
- }
- updateNonPrimary () {
- const stream = this.stream
- if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
- stream._duplexState = (stream._duplexState | WRITE_ACTIVE) & WRITE_NOT_FINISHING
- stream._final(afterFinal.bind(this))
- return
- }
- if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
- if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
- stream._duplexState |= ACTIVE
- stream._destroy(afterDestroy.bind(this))
- }
- return
- }
- if ((stream._duplexState & IS_OPENING) === OPENING) {
- stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
- stream._open(afterOpen.bind(this))
- }
- }
- continueUpdate () {
- if ((this.stream._duplexState & WRITE_NEXT_TICK) === 0) return false
- this.stream._duplexState &= WRITE_NOT_NEXT_TICK
- return true
- }
- updateCallback () {
- if ((this.stream._duplexState & WRITE_UPDATE_SYNC_STATUS) === WRITE_PRIMARY) this.update()
- else this.updateNextTick()
- }
- updateNextTick () {
- if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
- this.stream._duplexState |= WRITE_NEXT_TICK
- if ((this.stream._duplexState & WRITE_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
- }
- }
- class ReadableState {
- constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
- this.stream = stream
- this.queue = new FIFO()
- this.highWaterMark = highWaterMark === 0 ? 1 : highWaterMark
- this.buffered = 0
- this.readAhead = highWaterMark > 0
- this.error = null
- this.pipeline = null
- this.byteLength = byteLengthReadable || byteLength || defaultByteLength
- this.map = mapReadable || map
- this.pipeTo = null
- this.afterRead = afterRead.bind(this)
- this.afterUpdateNextTick = updateReadNT.bind(this)
- }
- get ended () {
- return (this.stream._duplexState & READ_DONE) !== 0
- }
- pipe (pipeTo, cb) {
- if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
- if (typeof cb !== 'function') cb = null
- this.stream._duplexState |= READ_PIPE_DRAINED
- this.pipeTo = pipeTo
- this.pipeline = new Pipeline(this.stream, pipeTo, cb)
- if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
- if (isStreamx(pipeTo)) {
- pipeTo._writableState.pipeline = this.pipeline
- if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
- pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
- } else {
- const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
- const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
- pipeTo.on('error', onerror)
- pipeTo.on('close', onclose)
- pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
- }
- pipeTo.on('drain', afterDrain.bind(this))
- this.stream.emit('piping', pipeTo)
- pipeTo.emit('pipe', this.stream)
- }
- push (data) {
- const stream = this.stream
- if (data === null) {
- this.highWaterMark = 0
- stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
- return false
- }
- if (this.map !== null) data = this.map(data)
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
- return this.buffered < this.highWaterMark
- }
- shift () {
- const data = this.queue.shift()
- this.buffered -= this.byteLength(data)
- if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
- return data
- }
- unshift (data) {
- const pending = [this.map !== null ? this.map(data) : data]
- while (this.buffered > 0) pending.push(this.shift())
- for (let i = 0; i < pending.length - 1; i++) {
- const data = pending[i]
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- }
- this.push(pending[pending.length - 1])
- }
- read () {
- const stream = this.stream
- if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
- const data = this.shift()
- if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
- if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
- return data
- }
- if (this.readAhead === false) {
- stream._duplexState |= READ_READ_AHEAD
- this.updateNextTick()
- }
- return null
- }
- drain () {
- const stream = this.stream
- while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
- const data = this.shift()
- if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
- if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
- }
- }
- update () {
- const stream = this.stream
- stream._duplexState |= READ_UPDATING
- do {
- this.drain()
- while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === READ_READ_AHEAD) {
- stream._duplexState |= READ_ACTIVE_AND_NEEDS_PUSH
- stream._read(this.afterRead)
- this.drain()
- }
- if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
- stream._duplexState |= READ_EMITTED_READABLE
- stream.emit('readable')
- }
- if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
- } while (this.continueUpdate() === true)
- stream._duplexState &= READ_NOT_UPDATING
- }
- updateNonPrimary () {
- const stream = this.stream
- if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
- stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
- stream.emit('end')
- if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
- if (this.pipeTo !== null) this.pipeTo.end()
- }
- if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
- if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
- stream._duplexState |= ACTIVE
- stream._destroy(afterDestroy.bind(this))
- }
- return
- }
- if ((stream._duplexState & IS_OPENING) === OPENING) {
- stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
- stream._open(afterOpen.bind(this))
- }
- }
- continueUpdate () {
- if ((this.stream._duplexState & READ_NEXT_TICK) === 0) return false
- this.stream._duplexState &= READ_NOT_NEXT_TICK
- return true
- }
- updateCallback () {
- if ((this.stream._duplexState & READ_UPDATE_SYNC_STATUS) === READ_PRIMARY) this.update()
- else this.updateNextTick()
- }
- updateNextTick () {
- if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
- this.stream._duplexState |= READ_NEXT_TICK
- if ((this.stream._duplexState & READ_UPDATING) === 0) queueTick(this.afterUpdateNextTick)
- }
- }
- class TransformState {
- constructor (stream) {
- this.data = null
- this.afterTransform = afterTransform.bind(stream)
- this.afterFinal = null
- }
- }
- class Pipeline {
- constructor (src, dst, cb) {
- this.from = src
- this.to = dst
- this.afterPipe = cb
- this.error = null
- this.pipeToFinished = false
- }
- finished () {
- this.pipeToFinished = true
- }
- done (stream, err) {
- if (err) this.error = err
- if (stream === this.to) {
- this.to = null
- if (this.from !== null) {
- if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
- this.from.destroy(this.error || new Error('Writable stream closed prematurely'))
- }
- return
- }
- }
- if (stream === this.from) {
- this.from = null
- if (this.to !== null) {
- if ((stream._duplexState & READ_DONE) === 0) {
- this.to.destroy(this.error || new Error('Readable stream closed before ending'))
- }
- return
- }
- }
- if (this.afterPipe !== null) this.afterPipe(this.error)
- this.to = this.from = this.afterPipe = null
- }
- }
- function afterDrain () {
- this.stream._duplexState |= READ_PIPE_DRAINED
- this.updateCallback()
- }
- function afterFinal (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- if ((stream._duplexState & DESTROY_STATUS) === 0) {
- stream._duplexState |= WRITE_DONE
- stream.emit('finish')
- }
- if ((stream._duplexState & AUTO_DESTROY) === DONE) {
- stream._duplexState |= DESTROYING
- }
- stream._duplexState &= WRITE_NOT_ACTIVE
- // no need to wait the extra tick here, so we short circuit that
- if ((stream._duplexState & WRITE_UPDATING) === 0) this.update()
- else this.updateNextTick()
- }
- function afterDestroy (err) {
- const stream = this.stream
- if (!err && this.error !== STREAM_DESTROYED) err = this.error
- if (err) stream.emit('error', err)
- stream._duplexState |= DESTROYED
- stream.emit('close')
- const rs = stream._readableState
- const ws = stream._writableState
- if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
- if (ws !== null) {
- while (ws.drains !== null && ws.drains.length > 0) ws.drains.shift().resolve(false)
- if (ws.pipeline !== null) ws.pipeline.done(stream, err)
- }
- }
- function afterWrite (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- stream._duplexState &= WRITE_NOT_ACTIVE
- if (this.drains !== null) tickDrains(this.drains)
- if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
- stream._duplexState &= WRITE_DRAINED
- if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
- stream.emit('drain')
- }
- }
- this.updateCallback()
- }
- function afterRead (err) {
- if (err) this.stream.destroy(err)
- this.stream._duplexState &= READ_NOT_ACTIVE
- if (this.readAhead === false && (this.stream._duplexState & READ_RESUMED) === 0) this.stream._duplexState &= READ_NO_READ_AHEAD
- this.updateCallback()
- }
- function updateReadNT () {
- if ((this.stream._duplexState & READ_UPDATING) === 0) {
- this.stream._duplexState &= READ_NOT_NEXT_TICK
- this.update()
- }
- }
- function updateWriteNT () {
- if ((this.stream._duplexState & WRITE_UPDATING) === 0) {
- this.stream._duplexState &= WRITE_NOT_NEXT_TICK
- this.update()
- }
- }
- function tickDrains (drains) {
- for (let i = 0; i < drains.length; i++) {
- // drains.writes are monotonic, so if one is 0 its always the first one
- if (--drains[i].writes === 0) {
- drains.shift().resolve(true)
- i--
- }
- }
- }
- function afterOpen (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- if ((stream._duplexState & DESTROYING) === 0) {
- if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
- if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
- stream.emit('open')
- }
- stream._duplexState &= NOT_ACTIVE
- if (stream._writableState !== null) {
- stream._writableState.updateCallback()
- }
- if (stream._readableState !== null) {
- stream._readableState.updateCallback()
- }
- }
- function afterTransform (err, data) {
- if (data !== undefined && data !== null) this.push(data)
- this._writableState.afterWrite(err)
- }
- function newListener (name) {
- if (this._readableState !== null) {
- if (name === 'data') {
- this._duplexState |= (READ_EMIT_DATA | READ_RESUMED_READ_AHEAD)
- this._readableState.updateNextTick()
- }
- if (name === 'readable') {
- this._duplexState |= READ_EMIT_READABLE
- this._readableState.updateNextTick()
- }
- }
- if (this._writableState !== null) {
- if (name === 'drain') {
- this._duplexState |= WRITE_EMIT_DRAIN
- this._writableState.updateNextTick()
- }
- }
- }
- class Stream extends EventEmitter {
- constructor (opts) {
- super()
- this._duplexState = 0
- this._readableState = null
- this._writableState = null
- if (opts) {
- if (opts.open) this._open = opts.open
- if (opts.destroy) this._destroy = opts.destroy
- if (opts.predestroy) this._predestroy = opts.predestroy
- if (opts.signal) {
- opts.signal.addEventListener('abort', abort.bind(this))
- }
- }
- this.on('newListener', newListener)
- }
- _open (cb) {
- cb(null)
- }
- _destroy (cb) {
- cb(null)
- }
- _predestroy () {
- // does nothing
- }
- get readable () {
- return this._readableState !== null ? true : undefined
- }
- get writable () {
- return this._writableState !== null ? true : undefined
- }
- get destroyed () {
- return (this._duplexState & DESTROYED) !== 0
- }
- get destroying () {
- return (this._duplexState & DESTROY_STATUS) !== 0
- }
- destroy (err) {
- if ((this._duplexState & DESTROY_STATUS) === 0) {
- if (!err) err = STREAM_DESTROYED
- this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
- if (this._readableState !== null) {
- this._readableState.highWaterMark = 0
- this._readableState.error = err
- }
- if (this._writableState !== null) {
- this._writableState.highWaterMark = 0
- this._writableState.error = err
- }
- this._duplexState |= PREDESTROYING
- this._predestroy()
- this._duplexState &= NOT_PREDESTROYING
- if (this._readableState !== null) this._readableState.updateNextTick()
- if (this._writableState !== null) this._writableState.updateNextTick()
- }
- }
- }
- class Readable extends Stream {
- constructor (opts) {
- super(opts)
- this._duplexState |= OPENING | WRITE_DONE | READ_READ_AHEAD
- this._readableState = new ReadableState(this, opts)
- if (opts) {
- if (this._readableState.readAhead === false) this._duplexState &= READ_NO_READ_AHEAD
- if (opts.read) this._read = opts.read
- if (opts.eagerOpen) this._readableState.updateNextTick()
- }
- }
- _read (cb) {
- cb(null)
- }
- pipe (dest, cb) {
- this._readableState.updateNextTick()
- this._readableState.pipe(dest, cb)
- return dest
- }
- read () {
- this._readableState.updateNextTick()
- return this._readableState.read()
- }
- push (data) {
- this._readableState.updateNextTick()
- return this._readableState.push(data)
- }
- unshift (data) {
- this._readableState.updateNextTick()
- return this._readableState.unshift(data)
- }
- resume () {
- this._duplexState |= READ_RESUMED_READ_AHEAD
- this._readableState.updateNextTick()
- return this
- }
- pause () {
- this._duplexState &= (this._readableState.readAhead === false ? READ_PAUSED_NO_READ_AHEAD : READ_PAUSED)
- return this
- }
- static _fromAsyncIterator (ite, opts) {
- let destroy
- const rs = new Readable({
- ...opts,
- read (cb) {
- ite.next().then(push).then(cb.bind(null, null)).catch(cb)
- },
- predestroy () {
- destroy = ite.return()
- },
- destroy (cb) {
- if (!destroy) return cb(null)
- destroy.then(cb.bind(null, null)).catch(cb)
- }
- })
- return rs
- function push (data) {
- if (data.done) rs.push(null)
- else rs.push(data.value)
- }
- }
- static from (data, opts) {
- if (isReadStreamx(data)) return data
- if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)
- if (!Array.isArray(data)) data = data === undefined ? [] : [data]
- let i = 0
- return new Readable({
- ...opts,
- read (cb) {
- this.push(i === data.length ? null : data[i++])
- cb(null)
- }
- })
- }
- static isBackpressured (rs) {
- return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
- }
- static isPaused (rs) {
- return (rs._duplexState & READ_RESUMED) === 0
- }
- [asyncIterator] () {
- const stream = this
- let error = null
- let promiseResolve = null
- let promiseReject = null
- this.on('error', (err) => { error = err })
- this.on('readable', onreadable)
- this.on('close', onclose)
- return {
- [asyncIterator] () {
- return this
- },
- next () {
- return new Promise(function (resolve, reject) {
- promiseResolve = resolve
- promiseReject = reject
- const data = stream.read()
- if (data !== null) ondata(data)
- else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
- })
- },
- return () {
- return destroy(null)
- },
- throw (err) {
- return destroy(err)
- }
- }
- function onreadable () {
- if (promiseResolve !== null) ondata(stream.read())
- }
- function onclose () {
- if (promiseResolve !== null) ondata(null)
- }
- function ondata (data) {
- if (promiseReject === null) return
- if (error) promiseReject(error)
- else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
- else promiseResolve({ value: data, done: data === null })
- promiseReject = promiseResolve = null
- }
- function destroy (err) {
- stream.destroy(err)
- return new Promise((resolve, reject) => {
- if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })
- stream.once('close', function () {
- if (err) reject(err)
- else resolve({ value: undefined, done: true })
- })
- })
- }
- }
- }
- class Writable extends Stream {
- constructor (opts) {
- super(opts)
- this._duplexState |= OPENING | READ_DONE
- this._writableState = new WritableState(this, opts)
- if (opts) {
- if (opts.writev) this._writev = opts.writev
- if (opts.write) this._write = opts.write
- if (opts.final) this._final = opts.final
- if (opts.eagerOpen) this._writableState.updateNextTick()
- }
- }
- _writev (batch, cb) {
- cb(null)
- }
- _write (data, cb) {
- this._writableState.autoBatch(data, cb)
- }
- _final (cb) {
- cb(null)
- }
- static isBackpressured (ws) {
- return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
- }
- static drained (ws) {
- if (ws.destroyed) return Promise.resolve(false)
- const state = ws._writableState
- const pending = (isWritev(ws) ? Math.min(1, state.queue.length) : state.queue.length)
- const writes = pending + ((ws._duplexState & WRITE_WRITING) ? 1 : 0)
- if (writes === 0) return Promise.resolve(true)
- if (state.drains === null) state.drains = []
- return new Promise((resolve) => {
- state.drains.push({ writes, resolve })
- })
- }
- write (data) {
- this._writableState.updateNextTick()
- return this._writableState.push(data)
- }
- end (data) {
- this._writableState.updateNextTick()
- this._writableState.end(data)
- return this
- }
- }
- class Duplex extends Readable { // and Writable
- constructor (opts) {
- super(opts)
- this._duplexState = OPENING | (this._duplexState & READ_READ_AHEAD)
- this._writableState = new WritableState(this, opts)
- if (opts) {
- if (opts.writev) this._writev = opts.writev
- if (opts.write) this._write = opts.write
- if (opts.final) this._final = opts.final
- }
- }
- _writev (batch, cb) {
- cb(null)
- }
- _write (data, cb) {
- this._writableState.autoBatch(data, cb)
- }
- _final (cb) {
- cb(null)
- }
- write (data) {
- this._writableState.updateNextTick()
- return this._writableState.push(data)
- }
- end (data) {
- this._writableState.updateNextTick()
- this._writableState.end(data)
- return this
- }
- }
- class Transform extends Duplex {
- constructor (opts) {
- super(opts)
- this._transformState = new TransformState(this)
- if (opts) {
- if (opts.transform) this._transform = opts.transform
- if (opts.flush) this._flush = opts.flush
- }
- }
- _write (data, cb) {
- if (this._readableState.buffered >= this._readableState.highWaterMark) {
- this._transformState.data = data
- } else {
- this._transform(data, this._transformState.afterTransform)
- }
- }
- _read (cb) {
- if (this._transformState.data !== null) {
- const data = this._transformState.data
- this._transformState.data = null
- cb(null)
- this._transform(data, this._transformState.afterTransform)
- } else {
- cb(null)
- }
- }
- destroy (err) {
- super.destroy(err)
- if (this._transformState.data !== null) {
- this._transformState.data = null
- this._transformState.afterTransform()
- }
- }
- _transform (data, cb) {
- cb(null, data)
- }
- _flush (cb) {
- cb(null)
- }
- _final (cb) {
- this._transformState.afterFinal = cb
- this._flush(transformAfterFlush.bind(this))
- }
- }
- class PassThrough extends Transform {}
- function transformAfterFlush (err, data) {
- const cb = this._transformState.afterFinal
- if (err) return cb(err)
- if (data !== null && data !== undefined) this.push(data)
- this.push(null)
- cb(null)
- }
- function pipelinePromise (...streams) {
- return new Promise((resolve, reject) => {
- return pipeline(...streams, (err) => {
- if (err) return reject(err)
- resolve()
- })
- })
- }
- function pipeline (stream, ...streams) {
- const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]
- const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null
- if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')
- let src = all[0]
- let dest = null
- let error = null
- for (let i = 1; i < all.length; i++) {
- dest = all[i]
- if (isStreamx(src)) {
- src.pipe(dest, onerror)
- } else {
- errorHandle(src, true, i > 1, onerror)
- src.pipe(dest)
- }
- src = dest
- }
- if (done) {
- let fin = false
- const autoDestroy = isStreamx(dest) || !!(dest._writableState && dest._writableState.autoDestroy)
- dest.on('error', (err) => {
- if (error === null) error = err
- })
- dest.on('finish', () => {
- fin = true
- if (!autoDestroy) done(error)
- })
- if (autoDestroy) {
- dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))
- }
- }
- return dest
- function errorHandle (s, rd, wr, onerror) {
- s.on('error', onerror)
- s.on('close', onclose)
- function onclose () {
- if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)
- if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)
- }
- }
- function onerror (err) {
- if (!err || error) return
- error = err
- for (const s of all) {
- s.destroy(err)
- }
- }
- }
- function isStream (stream) {
- return !!stream._readableState || !!stream._writableState
- }
- function isStreamx (stream) {
- return typeof stream._duplexState === 'number' && isStream(stream)
- }
- function getStreamError (stream) {
- const err = (stream._readableState && stream._readableState.error) || (stream._writableState && stream._writableState.error)
- return err === STREAM_DESTROYED ? null : err // only explicit errors
- }
- function isReadStreamx (stream) {
- return isStreamx(stream) && stream.readable
- }
- function isTypedArray (data) {
- return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
- }
- function defaultByteLength (data) {
- return isTypedArray(data) ? data.byteLength : 1024
- }
- function noop () {}
- function abort () {
- this.destroy(new Error('Stream aborted.'))
- }
- function isWritev (s) {
- return s._writev !== Writable.prototype._writev && s._writev !== Duplex.prototype._writev
- }
- module.exports = {
- pipeline,
- pipelinePromise,
- isStream,
- isStreamx,
- getStreamError,
- Stream,
- Writable,
- Readable,
- Duplex,
- Transform,
- // Export PassThrough for compatibility with Node.js core's stream module
- PassThrough
- }
|