This commit is contained in:
orion 2024-05-09 11:01:09 -05:00
parent ad04aab031
commit bbbd84c640
Signed by: orion
GPG Key ID: 6D4165AE4C928719

View File

@ -182,70 +182,125 @@ export class Zip extends Stream.Readable {
/** @type {Array<Stream.Readable>} */ /** @type {Array<Stream.Readable>} */
streams = [] streams = []
/** @type {Array<unknown | null>} */ /** @type {Array<Array<unknown | null>>} */
buf = [] buf = []
/** @type {Timer | null} */
readingTimer = null
/** @type {Timer | null} */
pushingTimer = null
/** @param {...Stream.Readable} streams */ /** @param {...Stream.Readable} streams */
constructor(...streams) { constructor(...streams) {
super({objectMode: true}) super({objectMode: true})
this.id = zipCount++ this.id = zipCount++
log(`Zip {id: ${this.id}}#new()`)
log(` this.streams = Array {streams: ${streams.length}}`)
this.streams = streams this.streams = streams
this.streams.forEach((s, ix) => { this.streams.forEach((s, ix) => {
log(` this.streams[${ix}].once('error', ...)`)
log(` this.streams[${ix}].once('end', ...)`)
log(` this.streams[${ix}].once('data', ...)`)
s.once('error', e => this.destroy(e)) s.once('error', e => this.destroy(e))
s.once('end', () => this.push(null)) s.once('end', () => this.push(null))
s.on('data', ck => { s.on('data', ck => {
log(`Zip {id: ${this.id}}#new()`)
log(` this.streams[${ix}].once('data', ...)`)
log(` this.bufput(${ix}, ${ck})`)
log(` stream.pause()`)
this.bufput(ix, ck) this.bufput(ix, ck)
s.pause() s.pause()
}) })
}) })
} }
/** @type {(ix: number, val: unknown) => boolean} */ /** @type {(ix: number, val: unknown) => void} */
bufput(ix, val) { bufput(ix, val) {
log(`Zip {id: ${this.id}}#bufput(${ix}, ${val})`) if (this.buflastdone()) {
const bufstr = JSON.stringify(this.buf.map(a => a === null ? 'null' : '..')) this.bufnew()
log(` this.buf = ${bufstr}`) }
this.buf[ix] = val const zipped = this.buflast()
if (!this.isWaiting()) { zipped[ix] = val
log(` this.push(${bufstr})`) }
const canPush = this.push(this.buf)
this.bufinit() bufnew() {
if (canPush) { this.buf.push(this.streams.map(_ => null))
log(` this.streams.forEach(s => s.resume())`) }
this.streams.forEach(s => s.resume())
} buflastdone() {
return canPush const l = this.buflast()
} else { return l.every(a => a !== null)
}
bufheaddone() {
const l = this.bufhead()
return l.every(a => a !== null)
}
/** @returns {unknown[]} */
buflast() {
if (this.buf.length === 0) {
this.bufnew()
}
return this.buf[this.buf.length - 1]
}
/** @returns {unknown[]} */
bufhead() {
if (this.buf.length === 0) {
this.bufnew()
}
return this.buf[0]
}
anyended() {
return this.streams.some(s => s.readableEnded)
}
maybeend() {
if (!this.bufheaddone() && this.anyended()) {
this.push(null)
return true return true
} else {
return false
} }
} }
bufinit() { readzipped() {
const nuls = this.streams.map(() => null) if (this.anyended()) {
log(` this.buf = ${JSON.stringify(nuls)}`) return false
this.buf = nuls }
}
isWaiting() { if (this.buflastdone()) {
return this.buf.some(a => a === null) this.bufnew()
this.streams.forEach((s, ix) => {
this.buflast()[ix] = s.read()
})
return this.buflastdone()
} else {
const missingIxs = this.buflast().flatMap((a, ix) => a === null ? [ix] : [])
missingIxs.forEach(ix => {
this.buflast()[ix] = this.streams[ix].read()
})
return this.buflastdone()
}
} }
_read() { _read() {
log(`Zip {id: ${this.id}}#_read()`) if (this.readingTimer) clearTimeout(this.readingTimer);
this.streams.forEach(s => { if (this.pushingTimer) clearTimeout(this.pushingTimer);
if (s.isPaused()) {
s.resume() this.readingTimer = setTimeout(() => {
while (this.readzipped()) {
continue;
} }
}) }, 0)
this.pushingTimer = setTimeout(() => {
let canWrite = true
if (this.maybeend()) {
return
}
while (canWrite && this.bufheaddone()) {
canWrite = this.push(this.bufhead())
this.buf.shift()
}
this.maybeend()
}, 0)
} }
} }