extract.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. const { Writable, Readable, getStreamError } = require('streamx')
  2. const FIFO = require('fast-fifo')
  3. const b4a = require('b4a')
  4. const headers = require('./headers')
  5. const EMPTY = b4a.alloc(0)
  6. class BufferList {
  7. constructor () {
  8. this.buffered = 0
  9. this.shifted = 0
  10. this.queue = new FIFO()
  11. this._offset = 0
  12. }
  13. push (buffer) {
  14. this.buffered += buffer.byteLength
  15. this.queue.push(buffer)
  16. }
  17. shiftFirst (size) {
  18. return this._buffered === 0 ? null : this._next(size)
  19. }
  20. shift (size) {
  21. if (size > this.buffered) return null
  22. if (size === 0) return EMPTY
  23. let chunk = this._next(size)
  24. if (size === chunk.byteLength) return chunk // likely case
  25. const chunks = [chunk]
  26. while ((size -= chunk.byteLength) > 0) {
  27. chunk = this._next(size)
  28. chunks.push(chunk)
  29. }
  30. return b4a.concat(chunks)
  31. }
  32. _next (size) {
  33. const buf = this.queue.peek()
  34. const rem = buf.byteLength - this._offset
  35. if (size >= rem) {
  36. const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf
  37. this.queue.shift()
  38. this._offset = 0
  39. this.buffered -= rem
  40. this.shifted += rem
  41. return sub
  42. }
  43. this.buffered -= size
  44. this.shifted += size
  45. return buf.subarray(this._offset, (this._offset += size))
  46. }
  47. }
  48. class Source extends Readable {
  49. constructor (self, header, offset) {
  50. super()
  51. this.header = header
  52. this.offset = offset
  53. this._parent = self
  54. }
  55. _read (cb) {
  56. if (this.header.size === 0) {
  57. this.push(null)
  58. }
  59. if (this._parent._stream === this) {
  60. this._parent._update()
  61. }
  62. cb(null)
  63. }
  64. _predestroy () {
  65. this._parent.destroy(getStreamError(this))
  66. }
  67. _detach () {
  68. if (this._parent._stream === this) {
  69. this._parent._stream = null
  70. this._parent._missing = overflow(this.header.size)
  71. this._parent._update()
  72. }
  73. }
  74. _destroy (cb) {
  75. this._detach()
  76. cb(null)
  77. }
  78. }
  79. class Extract extends Writable {
  80. constructor (opts) {
  81. super(opts)
  82. if (!opts) opts = {}
  83. this._buffer = new BufferList()
  84. this._offset = 0
  85. this._header = null
  86. this._stream = null
  87. this._missing = 0
  88. this._longHeader = false
  89. this._callback = noop
  90. this._locked = false
  91. this._finished = false
  92. this._pax = null
  93. this._paxGlobal = null
  94. this._gnuLongPath = null
  95. this._gnuLongLinkPath = null
  96. this._filenameEncoding = opts.filenameEncoding || 'utf-8'
  97. this._allowUnknownFormat = !!opts.allowUnknownFormat
  98. this._unlockBound = this._unlock.bind(this)
  99. }
  100. _unlock (err) {
  101. this._locked = false
  102. if (err) {
  103. this.destroy(err)
  104. this._continueWrite(err)
  105. return
  106. }
  107. this._update()
  108. }
  109. _consumeHeader () {
  110. if (this._locked) return false
  111. this._offset = this._buffer.shifted
  112. try {
  113. this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat)
  114. } catch (err) {
  115. this._continueWrite(err)
  116. return false
  117. }
  118. if (!this._header) return true
  119. switch (this._header.type) {
  120. case 'gnu-long-path':
  121. case 'gnu-long-link-path':
  122. case 'pax-global-header':
  123. case 'pax-header':
  124. this._longHeader = true
  125. this._missing = this._header.size
  126. return true
  127. }
  128. this._locked = true
  129. this._applyLongHeaders()
  130. if (this._header.size === 0 || this._header.type === 'directory') {
  131. this.emit('entry', this._header, this._createStream(), this._unlockBound)
  132. return true
  133. }
  134. this._stream = this._createStream()
  135. this._missing = this._header.size
  136. this.emit('entry', this._header, this._stream, this._unlockBound)
  137. return true
  138. }
  139. _applyLongHeaders () {
  140. if (this._gnuLongPath) {
  141. this._header.name = this._gnuLongPath
  142. this._gnuLongPath = null
  143. }
  144. if (this._gnuLongLinkPath) {
  145. this._header.linkname = this._gnuLongLinkPath
  146. this._gnuLongLinkPath = null
  147. }
  148. if (this._pax) {
  149. if (this._pax.path) this._header.name = this._pax.path
  150. if (this._pax.linkpath) this._header.linkname = this._pax.linkpath
  151. if (this._pax.size) this._header.size = parseInt(this._pax.size, 10)
  152. this._header.pax = this._pax
  153. this._pax = null
  154. }
  155. }
  156. _decodeLongHeader (buf) {
  157. switch (this._header.type) {
  158. case 'gnu-long-path':
  159. this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding)
  160. break
  161. case 'gnu-long-link-path':
  162. this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding)
  163. break
  164. case 'pax-global-header':
  165. this._paxGlobal = headers.decodePax(buf)
  166. break
  167. case 'pax-header':
  168. this._pax = this._paxGlobal === null
  169. ? headers.decodePax(buf)
  170. : Object.assign({}, this._paxGlobal, headers.decodePax(buf))
  171. break
  172. }
  173. }
  174. _consumeLongHeader () {
  175. this._longHeader = false
  176. this._missing = overflow(this._header.size)
  177. const buf = this._buffer.shift(this._header.size)
  178. try {
  179. this._decodeLongHeader(buf)
  180. } catch (err) {
  181. this._continueWrite(err)
  182. return false
  183. }
  184. return true
  185. }
  186. _consumeStream () {
  187. const buf = this._buffer.shiftFirst(this._missing)
  188. if (buf === null) return false
  189. this._missing -= buf.byteLength
  190. const drained = this._stream.push(buf)
  191. if (this._missing === 0) {
  192. this._stream.push(null)
  193. if (drained) this._stream._detach()
  194. return drained && this._locked === false
  195. }
  196. return drained
  197. }
  198. _createStream () {
  199. return new Source(this, this._header, this._offset)
  200. }
  201. _update () {
  202. while (this._buffer.buffered > 0 && !this.destroying) {
  203. if (this._missing > 0) {
  204. if (this._stream !== null) {
  205. if (this._consumeStream() === false) return
  206. continue
  207. }
  208. if (this._longHeader === true) {
  209. if (this._missing > this._buffer.buffered) break
  210. if (this._consumeLongHeader() === false) return false
  211. continue
  212. }
  213. const ignore = this._buffer.shiftFirst(this._missing)
  214. if (ignore !== null) this._missing -= ignore.byteLength
  215. continue
  216. }
  217. if (this._buffer.buffered < 512) break
  218. if (this._stream !== null || this._consumeHeader() === false) return
  219. }
  220. this._continueWrite(null)
  221. }
  222. _continueWrite (err) {
  223. const cb = this._callback
  224. this._callback = noop
  225. cb(err)
  226. }
  227. _write (data, cb) {
  228. this._callback = cb
  229. this._buffer.push(data)
  230. this._update()
  231. }
  232. _final (cb) {
  233. this._finished = this._missing === 0 && this._buffer.buffered === 0
  234. cb(this._finished ? null : new Error('Unexpected end of data'))
  235. }
  236. _predestroy () {
  237. this._continueWrite(null)
  238. }
  239. _destroy (cb) {
  240. if (this._stream) this._stream.destroy(getStreamError(this))
  241. cb(null)
  242. }
  243. [Symbol.asyncIterator] () {
  244. let error = null
  245. let promiseResolve = null
  246. let promiseReject = null
  247. let entryStream = null
  248. let entryCallback = null
  249. const extract = this
  250. this.on('entry', onentry)
  251. this.on('error', (err) => { error = err })
  252. this.on('close', onclose)
  253. return {
  254. [Symbol.asyncIterator] () {
  255. return this
  256. },
  257. next () {
  258. return new Promise(onnext)
  259. },
  260. return () {
  261. return destroy(null)
  262. },
  263. throw (err) {
  264. return destroy(err)
  265. }
  266. }
  267. function consumeCallback (err) {
  268. if (!entryCallback) return
  269. const cb = entryCallback
  270. entryCallback = null
  271. cb(err)
  272. }
  273. function onnext (resolve, reject) {
  274. if (error) {
  275. return reject(error)
  276. }
  277. if (entryStream) {
  278. resolve({ value: entryStream, done: false })
  279. entryStream = null
  280. return
  281. }
  282. promiseResolve = resolve
  283. promiseReject = reject
  284. consumeCallback(null)
  285. if (extract._finished && promiseResolve) {
  286. promiseResolve({ value: undefined, done: true })
  287. promiseResolve = promiseReject = null
  288. }
  289. }
  290. function onentry (header, stream, callback) {
  291. entryCallback = callback
  292. stream.on('error', noop) // no way around this due to tick sillyness
  293. if (promiseResolve) {
  294. promiseResolve({ value: stream, done: false })
  295. promiseResolve = promiseReject = null
  296. } else {
  297. entryStream = stream
  298. }
  299. }
  300. function onclose () {
  301. consumeCallback(error)
  302. if (!promiseResolve) return
  303. if (error) promiseReject(error)
  304. else promiseResolve({ value: undefined, done: true })
  305. promiseResolve = promiseReject = null
  306. }
  307. function destroy (err) {
  308. extract.destroy(err)
  309. consumeCallback(err)
  310. return new Promise((resolve, reject) => {
  311. if (extract.destroyed) return resolve({ value: undefined, done: true })
  312. extract.once('close', function () {
  313. if (err) reject(err)
  314. else resolve({ value: undefined, done: true })
  315. })
  316. })
  317. }
  318. }
  319. }
  320. module.exports = function extract (opts) {
  321. return new Extract(opts)
  322. }
  323. function noop () {}
  324. function overflow (size) {
  325. size &= 511
  326. return size && 512 - size
  327. }