diff --git a/src/Node.Stream.Object.js b/src/Node.Stream.Object.js index e732833..5f51ae6 100644 --- a/src/Node.Stream.Object.js +++ b/src/Node.Stream.Object.js @@ -182,70 +182,125 @@ export class Zip extends Stream.Readable { /** @type {Array} */ streams = [] - /** @type {Array} */ + /** @type {Array>} */ buf = [] + /** @type {Timer | null} */ + readingTimer = null + /** @type {Timer | null} */ + pushingTimer = null + /** @param {...Stream.Readable} streams */ constructor(...streams) { super({objectMode: true}) this.id = zipCount++ - log(`Zip {id: ${this.id}}#new()`) - log(` this.streams = Array {streams: ${streams.length}}`) this.streams = streams this.streams.forEach((s, ix) => { - log(` this.streams[${ix}].once('error', ...)`) - log(` this.streams[${ix}].once('end', ...)`) - log(` this.streams[${ix}].once('data', ...)`) s.once('error', e => this.destroy(e)) s.once('end', () => this.push(null)) s.on('data', ck => { - log(`Zip {id: ${this.id}}#new()`) - log(` this.streams[${ix}].once('data', ...)`) - log(` this.bufput(${ix}, ${ck})`) - log(` stream.pause()`) this.bufput(ix, ck) s.pause() }) }) } - /** @type {(ix: number, val: unknown) => boolean} */ + /** @type {(ix: number, val: unknown) => void} */ bufput(ix, val) { - log(`Zip {id: ${this.id}}#bufput(${ix}, ${val})`) - const bufstr = JSON.stringify(this.buf.map(a => a === null ? 'null' : '..')) - log(` this.buf = ${bufstr}`) - this.buf[ix] = val - if (!this.isWaiting()) { - log(` this.push(${bufstr})`) - const canPush = this.push(this.buf) - this.bufinit() - if (canPush) { - log(` this.streams.forEach(s => s.resume())`) - this.streams.forEach(s => s.resume()) - } - return canPush - } else { + if (this.buflastdone()) { + this.bufnew() + } + const zipped = this.buflast() + zipped[ix] = val + } + + bufnew() { + this.buf.push(this.streams.map(_ => null)) + } + + buflastdone() { + const l = this.buflast() + return l.every(a => a !== null) + } + + bufheaddone() { + const l = this.bufhead() + return l.every(a => a !== null) + } + + /** @returns {unknown[]} */ + buflast() { + if (this.buf.length === 0) { + this.bufnew() + } + return this.buf[this.buf.length - 1] + } + + /** @returns {unknown[]} */ + bufhead() { + if (this.buf.length === 0) { + this.bufnew() + } + return this.buf[0] + } + + anyended() { + return this.streams.some(s => s.readableEnded) + } + + maybeend() { + if (!this.bufheaddone() && this.anyended()) { + this.push(null) return true + } else { + return false } } - bufinit() { - const nuls = this.streams.map(() => null) - log(` this.buf = ${JSON.stringify(nuls)}`) - this.buf = nuls - } + readzipped() { + if (this.anyended()) { + return false + } - isWaiting() { - return this.buf.some(a => a === null) + if (this.buflastdone()) { + this.bufnew() + this.streams.forEach((s, ix) => { + this.buflast()[ix] = s.read() + }) + return this.buflastdone() + } else { + const missingIxs = this.buflast().flatMap((a, ix) => a === null ? [ix] : []) + missingIxs.forEach(ix => { + this.buflast()[ix] = this.streams[ix].read() + }) + return this.buflastdone() + } } _read() { - log(`Zip {id: ${this.id}}#_read()`) - this.streams.forEach(s => { - if (s.isPaused()) { - s.resume() + if (this.readingTimer) clearTimeout(this.readingTimer); + if (this.pushingTimer) clearTimeout(this.pushingTimer); + + this.readingTimer = setTimeout(() => { + while (this.readzipped()) { + continue; } - }) + }, 0) + + this.pushingTimer = setTimeout(() => { + let canWrite = true + + if (this.maybeend()) { + return + } + + while (canWrite && this.bufheaddone()) { + canWrite = this.push(this.bufhead()) + this.buf.shift() + } + + this.maybeend() + }, 0) } }