123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- const { Writable, Readable, getStreamError } = require('streamx')
- const FIFO = require('fast-fifo')
- const b4a = require('b4a')
- const headers = require('./headers')
- const EMPTY = b4a.alloc(0)
- class BufferList {
- constructor () {
- this.buffered = 0
- this.shifted = 0
- this.queue = new FIFO()
- this._offset = 0
- }
- push (buffer) {
- this.buffered += buffer.byteLength
- this.queue.push(buffer)
- }
- shiftFirst (size) {
- return this._buffered === 0 ? null : this._next(size)
- }
- shift (size) {
- if (size > this.buffered) return null
- if (size === 0) return EMPTY
- let chunk = this._next(size)
- if (size === chunk.byteLength) return chunk // likely case
- const chunks = [chunk]
- while ((size -= chunk.byteLength) > 0) {
- chunk = this._next(size)
- chunks.push(chunk)
- }
- return b4a.concat(chunks)
- }
- _next (size) {
- const buf = this.queue.peek()
- const rem = buf.byteLength - this._offset
- if (size >= rem) {
- const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf
- this.queue.shift()
- this._offset = 0
- this.buffered -= rem
- this.shifted += rem
- return sub
- }
- this.buffered -= size
- this.shifted += size
- return buf.subarray(this._offset, (this._offset += size))
- }
- }
- class Source extends Readable {
- constructor (self, header, offset) {
- super()
- this.header = header
- this.offset = offset
- this._parent = self
- }
- _read (cb) {
- if (this.header.size === 0) {
- this.push(null)
- }
- if (this._parent._stream === this) {
- this._parent._update()
- }
- cb(null)
- }
- _predestroy () {
- this._parent.destroy(getStreamError(this))
- }
- _detach () {
- if (this._parent._stream === this) {
- this._parent._stream = null
- this._parent._missing = overflow(this.header.size)
- this._parent._update()
- }
- }
- _destroy (cb) {
- this._detach()
- cb(null)
- }
- }
- class Extract extends Writable {
- constructor (opts) {
- super(opts)
- if (!opts) opts = {}
- this._buffer = new BufferList()
- this._offset = 0
- this._header = null
- this._stream = null
- this._missing = 0
- this._longHeader = false
- this._callback = noop
- this._locked = false
- this._finished = false
- this._pax = null
- this._paxGlobal = null
- this._gnuLongPath = null
- this._gnuLongLinkPath = null
- this._filenameEncoding = opts.filenameEncoding || 'utf-8'
- this._allowUnknownFormat = !!opts.allowUnknownFormat
- this._unlockBound = this._unlock.bind(this)
- }
- _unlock (err) {
- this._locked = false
- if (err) {
- this.destroy(err)
- this._continueWrite(err)
- return
- }
- this._update()
- }
- _consumeHeader () {
- if (this._locked) return false
- this._offset = this._buffer.shifted
- try {
- this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat)
- } catch (err) {
- this._continueWrite(err)
- return false
- }
- if (!this._header) return true
- switch (this._header.type) {
- case 'gnu-long-path':
- case 'gnu-long-link-path':
- case 'pax-global-header':
- case 'pax-header':
- this._longHeader = true
- this._missing = this._header.size
- return true
- }
- this._locked = true
- this._applyLongHeaders()
- if (this._header.size === 0 || this._header.type === 'directory') {
- this.emit('entry', this._header, this._createStream(), this._unlockBound)
- return true
- }
- this._stream = this._createStream()
- this._missing = this._header.size
- this.emit('entry', this._header, this._stream, this._unlockBound)
- return true
- }
- _applyLongHeaders () {
- if (this._gnuLongPath) {
- this._header.name = this._gnuLongPath
- this._gnuLongPath = null
- }
- if (this._gnuLongLinkPath) {
- this._header.linkname = this._gnuLongLinkPath
- this._gnuLongLinkPath = null
- }
- if (this._pax) {
- if (this._pax.path) this._header.name = this._pax.path
- if (this._pax.linkpath) this._header.linkname = this._pax.linkpath
- if (this._pax.size) this._header.size = parseInt(this._pax.size, 10)
- this._header.pax = this._pax
- this._pax = null
- }
- }
- _decodeLongHeader (buf) {
- switch (this._header.type) {
- case 'gnu-long-path':
- this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding)
- break
- case 'gnu-long-link-path':
- this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding)
- break
- case 'pax-global-header':
- this._paxGlobal = headers.decodePax(buf)
- break
- case 'pax-header':
- this._pax = this._paxGlobal === null
- ? headers.decodePax(buf)
- : Object.assign({}, this._paxGlobal, headers.decodePax(buf))
- break
- }
- }
- _consumeLongHeader () {
- this._longHeader = false
- this._missing = overflow(this._header.size)
- const buf = this._buffer.shift(this._header.size)
- try {
- this._decodeLongHeader(buf)
- } catch (err) {
- this._continueWrite(err)
- return false
- }
- return true
- }
- _consumeStream () {
- const buf = this._buffer.shiftFirst(this._missing)
- if (buf === null) return false
- this._missing -= buf.byteLength
- const drained = this._stream.push(buf)
- if (this._missing === 0) {
- this._stream.push(null)
- if (drained) this._stream._detach()
- return drained && this._locked === false
- }
- return drained
- }
- _createStream () {
- return new Source(this, this._header, this._offset)
- }
- _update () {
- while (this._buffer.buffered > 0 && !this.destroying) {
- if (this._missing > 0) {
- if (this._stream !== null) {
- if (this._consumeStream() === false) return
- continue
- }
- if (this._longHeader === true) {
- if (this._missing > this._buffer.buffered) break
- if (this._consumeLongHeader() === false) return false
- continue
- }
- const ignore = this._buffer.shiftFirst(this._missing)
- if (ignore !== null) this._missing -= ignore.byteLength
- continue
- }
- if (this._buffer.buffered < 512) break
- if (this._stream !== null || this._consumeHeader() === false) return
- }
- this._continueWrite(null)
- }
- _continueWrite (err) {
- const cb = this._callback
- this._callback = noop
- cb(err)
- }
- _write (data, cb) {
- this._callback = cb
- this._buffer.push(data)
- this._update()
- }
- _final (cb) {
- this._finished = this._missing === 0 && this._buffer.buffered === 0
- cb(this._finished ? null : new Error('Unexpected end of data'))
- }
- _predestroy () {
- this._continueWrite(null)
- }
- _destroy (cb) {
- if (this._stream) this._stream.destroy(getStreamError(this))
- cb(null)
- }
- [Symbol.asyncIterator] () {
- let error = null
- let promiseResolve = null
- let promiseReject = null
- let entryStream = null
- let entryCallback = null
- const extract = this
- this.on('entry', onentry)
- this.on('error', (err) => { error = err })
- this.on('close', onclose)
- return {
- [Symbol.asyncIterator] () {
- return this
- },
- next () {
- return new Promise(onnext)
- },
- return () {
- return destroy(null)
- },
- throw (err) {
- return destroy(err)
- }
- }
- function consumeCallback (err) {
- if (!entryCallback) return
- const cb = entryCallback
- entryCallback = null
- cb(err)
- }
- function onnext (resolve, reject) {
- if (error) {
- return reject(error)
- }
- if (entryStream) {
- resolve({ value: entryStream, done: false })
- entryStream = null
- return
- }
- promiseResolve = resolve
- promiseReject = reject
- consumeCallback(null)
- if (extract._finished && promiseResolve) {
- promiseResolve({ value: undefined, done: true })
- promiseResolve = promiseReject = null
- }
- }
- function onentry (header, stream, callback) {
- entryCallback = callback
- stream.on('error', noop) // no way around this due to tick sillyness
- if (promiseResolve) {
- promiseResolve({ value: stream, done: false })
- promiseResolve = promiseReject = null
- } else {
- entryStream = stream
- }
- }
- function onclose () {
- consumeCallback(error)
- if (!promiseResolve) return
- if (error) promiseReject(error)
- else promiseResolve({ value: undefined, done: true })
- promiseResolve = promiseReject = null
- }
- function destroy (err) {
- extract.destroy(err)
- consumeCallback(err)
- return new Promise((resolve, reject) => {
- if (extract.destroyed) return resolve({ value: undefined, done: true })
- extract.once('close', function () {
- if (err) reject(err)
- else resolve({ value: undefined, done: true })
- })
- })
- }
- }
- }
- module.exports = function extract (opts) {
- return new Extract(opts)
- }
- function noop () {}
- function overflow (size) {
- size &= 511
- return size && 512 - size
- }
|