fix: closer

This commit is contained in:
orion 2024-05-07 11:00:31 -05:00
parent f3e0a6095e
commit ad04aab031
Signed by: orion
GPG Key ID: 6D4165AE4C928719
5 changed files with 8208 additions and 88 deletions

7941
index.js Executable file

File diff suppressed because it is too large Load Diff

7
index.js.map Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,20 +1,54 @@
import Stream from "stream";
const DEBUG = process.env['NODEJS_OBJECT_STREAM_TRACE'] !== ''
/** @type {(s: string) => void} */
const log = m => DEBUG ? console.log(m) : undefined;
let chainCount = 0
let composeCount = 0
let neverCount = 0
let onceCount = 0
let bindCount = 0
let zipCount = 0
let mapCount = 0
let constCount = 0
let fromPromiseCount = 0
export class Never extends Stream.Readable {
constructor() {
super({read: function() {
this.push(null)
}, objectMode: true})
super({objectMode: true})
this.id = neverCount++
}
_read() {
log(`Never {id: ${this.id}}#_read()`)
log(` this.push(null)`)
this.push(null)
}
}
/** @template T */
export class Once extends Stream.Readable {
export class Once extends Stream.Duplex {
/** @param {T} a */
constructor(a) {
super({read: function() { }, objectMode: true})
this.push(a)
super({objectMode: true, allowHalfOpen: false})
this.a = a
this.id = onceCount++
this.push(this.a)
this.push(null)
log(`Once {id: ${this.id}}#new()`)
log(` this.push(${a})`)
log(` this.push(null)`)
}
/** @type {Stream.Duplex['_write']} */
_write(_ck, _enc, cb) {
cb()
}
_read() {
log(`Once {id: ${this.id}}#_read()`)
}
}
@ -22,9 +56,17 @@ export class Once extends Stream.Readable {
export class Const extends Stream.Transform {
/** @param {T} a */
constructor(a) {
super({transform: function(_c, _enc, cb) {
cb(null, a)
}, objectMode: true})
super({objectMode: true})
this.a = a
this.id = constCount++
}
/** @type {Stream.Transform['_transform']} */
_transform(_c, _enc, cb) {
log(`Const {id: ${this.id}}#_transform(${_c}, _, _)`)
log(` cb(${this.a})`)
this.push(this.a)
cb()
}
}
@ -32,16 +74,27 @@ export class Const extends Stream.Transform {
export class FromPromise extends Stream.Readable {
/** @param {Promise<T>} p */
constructor(p) {
p
.then(a => {
this.push(a)
this.push(null)
})
.catch(e => {
this.destroy(e)
})
super({objectMode: true})
this.id = fromPromiseCount++
p
.then(a => {
log(`FromPromise {id: ${this.id}}#new()`)
log(` ...p.then(...)`)
log(` this.push(${a})`)
log(` this.push(null)`)
this.push(a)
this.push(null)
})
.catch(e => {
log(`FromPromise {id: ${this.id}}#new()`)
log(` ...p.catch(...)`)
log(` this.destroy(${e})`)
this.destroy(e)
})
}
super({read: function() {}, objectMode: true})
_read() {
log(`FromPromise {id: ${this.id}}#_read()`)
}
}
@ -50,39 +103,56 @@ export class Chain extends Stream.Readable {
/** @param {...Stream.Readable} streams */
constructor(...streams) {
super({objectMode: true})
this.id = chainCount++
this.ix = -1
this.streams = streams
/** @type {Stream.Readable | undefined} */
this.cur = undefined
this.next()
}
next() {
log(`Chain {id: ${this.id}}#next()`)
this.ix++
if (this.ix === this.streams.length) {
return undefined
log(` this.push(null)`)
this.push(null)
} else {
this.cur = this.streams[this.ix]
const cur = this.streams[this.ix]
this.cur.once('end', () => {
cur.once('error', e => {
log(`Chain {id: ${this.id}}#next()`)
log(` cur.once('error', ...)`)
log(` this.destroy(${e})`)
this.destroy(e)
})
cur.once('end', () => {
log(`Chain {id: ${this.id}}#next()`)
log(` cur.once('end', ...)`)
log(` this.next()`)
this.next()
})
this.cur.on('data', ck => {
cur.on('data', ck => {
log(`Chain {id: ${this.id}}#next()`)
log(` cur.on('data', ...)`)
log(` this.push(${ck})`)
const canPush = this.push(ck)
if (this.cur && !canPush) {
this.cur.pause()
if (cur && !canPush) {
log(` cur.pause()`)
cur.pause()
}
})
}
}
_read() {
if (!this.cur) {
this.push(null)
} else if (this.cur.isPaused()) {
this.cur.resume()
}
log(`Chain {id: ${this.id}}#_read()`)
this.streams.forEach(s => {
if (s.isPaused()) {
log(` s.resume()`)
s.resume()
}
})
}
}
@ -93,18 +163,18 @@ export class Chain extends Stream.Readable {
export class Map extends Stream.Transform {
/** @param {(t: T) => R} f */
constructor(f) {
super({
transform: function (chunk, _, cb) {
try {
this.push(f(chunk))
cb();
} catch (e) {
// @ts-ignore
cb(e);
}
},
objectMode: true
})
super({objectMode: true})
this.f = f
this.id = mapCount++
}
/** @type {Stream.Transform['_transform']} */
_transform(ck, _, cb) {
log(`Map {id: ${this.id}}#_transform(${ck}, _, _)`)
const r = this.f(ck)
log(` const r = (${r})`)
log(` cb(null, ${r})`)
cb(null, r)
}
}
@ -118,25 +188,41 @@ export class Zip extends Stream.Readable {
/** @param {...Stream.Readable} streams */
constructor(...streams) {
super({objectMode: true})
this.id = zipCount++
log(`Zip {id: ${this.id}}#new()`)
log(` this.streams = Array {streams: ${streams.length}}`)
this.streams = streams
this.streams.forEach((s, ix) => {
log(` this.streams[${ix}].once('error', ...)`)
log(` this.streams[${ix}].once('end', ...)`)
log(` this.streams[${ix}].once('data', ...)`)
s.once('error', e => this.destroy(e))
s.once('end', () => this.push(null))
s.on('data', ck => {
const canPush = this.bufput(ix, ck)
if (!canPush) {
this.streams.forEach(s => s.pause())
}
log(`Zip {id: ${this.id}}#new()`)
log(` this.streams[${ix}].once('data', ...)`)
log(` this.bufput(${ix}, ${ck})`)
log(` stream.pause()`)
this.bufput(ix, ck)
s.pause()
})
})
}
/** @type {(ix: number, val: unknown) => boolean} */
bufput(ix, val) {
log(`Zip {id: ${this.id}}#bufput(${ix}, ${val})`)
const bufstr = JSON.stringify(this.buf.map(a => a === null ? 'null' : '..'))
log(` this.buf = ${bufstr}`)
this.buf[ix] = val
if (!this.isWaiting()) {
log(` this.push(${bufstr})`)
const canPush = this.push(this.buf)
this.bufinit()
if (canPush) {
log(` this.streams.forEach(s => s.resume())`)
this.streams.forEach(s => s.resume())
}
return canPush
} else {
return true
@ -144,7 +230,9 @@ export class Zip extends Stream.Readable {
}
bufinit() {
this.buf = this.streams.map(() => null)
const nuls = this.streams.map(() => null)
log(` this.buf = ${JSON.stringify(nuls)}`)
this.buf = nuls
}
isWaiting() {
@ -152,6 +240,7 @@ export class Zip extends Stream.Readable {
}
_read() {
log(`Zip {id: ${this.id}}#_read()`)
this.streams.forEach(s => {
if (s.isPaused()) {
s.resume()
@ -167,51 +256,100 @@ export class Compose extends Stream.Duplex {
*/
constructor(a, b) {
super({objectMode: true})
this.id = composeCount++
this.a = a
this.b = b
log(`Compose {id: ${this.id}}#new()`)
log(` a.on('data', ...)`)
log(` a.once('end', ...)`)
log(` a.once('error', ...)`)
log(` a.pause()`)
log(` b.on('drain', ...)`)
log(` b.on('data', ...)`)
log(` b.on('error', ...)`)
log(` b.on('finish', ...)`)
this.a.once('end', () => {
log(`Compose {id: ${this.id}}#new()`)
log(` a.on('end', ...)`)
log(` b.end()`)
this.b.end()
})
this.a.on('data', ck => {
log(`Compose {id: ${this.id}}#new()`)
log(` a.on('data', ...)`)
log(` b.write(${ck})`)
const canWrite = this.b.write(ck)
if (!canWrite) {
log(` a.pause()`)
this.a.pause()
}
})
this.a.once('error', e => this.destroy(e))
this.a.once('end', () => this.push(null))
this.a.once('error', e => {
log(`Compose {id: ${this.id}}#new()`)
log(` a.once('error', ...)`)
log(` this.destroy(${e})`)
this.destroy(e)
this.b.destroy(e)
})
this.b.on('drain', () => {
if (this.a.isPaused()) {
this.a.resume()
}
log(`Compose {id: ${this.id}}#new()`)
log(` b.on('drain', ...)`)
log(` this.a.resume()`)
this.a.resume()
})
this.b.on('data', ck => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.on('data', ...)`)
log(` this.push(${ck})`)
const canPush = this.push(ck)
if (!canPush) {
this.a.pause()
log(` b.pause()`)
this.b.pause()
}
})
this.b.once('error', e => this.destroy(e))
this.b.once('end', () => this.emit('end'))
this.b.once('finish', () => this.emit('finish'))
this.b.once('error', e => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.once('error', ...)`)
log(` this.destroy(${e})`)
this.destroy(e)
this.a.destroy(e)
})
this.b.once('finish', () => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.once('finish', ...)`)
log(` this.emit('finish')`)
this.push(null)
this.end()
this.emit('finish')
})
}
_read() {
if (this.a.isPaused()) {
this.a.resume()
}
log(`Compose {id: ${this.id}}#_read()`)
if (this.b.isPaused()) {
log(` b.resume()`)
this.b.resume()
}
}
/** @type {Stream.Duplex['_write']} */
_write(ck, _enc, cb) {
log(`Compose {id: ${this.id}}#_write(${ck}, _, _)`)
if (this.a instanceof Stream.Readable) {
throw new Error('Cannot `write` to a Readable stream')
}
log(` this.a.write(${ck}, _, _)`)
this.a.write(ck, _enc, cb)
}
}
@ -224,46 +362,78 @@ export class Bind extends Stream.Duplex {
* @param {(t: T) => () => Stream.Readable} f
*/
constructor(f) {
super({objectMode: true})
super({objectMode: true, allowHalfOpen: true})
this.f = f
this.id = bindCount++
/** @type {Stream.Readable | undefined } */
this.cur = undefined
this.ix = 0
/** @type {Array<Stream.Readable>} */
this.streams = []
/** @type {(() => void) | undefined} */
this.doneCb = undefined
this.paused = true
}
initcur() {
if (!this.cur) {
this.cur = this.streams[0]
/** @type {NonNullable<Stream.Duplex['_final']>} */
_final(cb) {
log(`Bind {id: ${this.id}}#_final(_)`)
this.doneCb = cb
}
init() {
log(`Bind {id: ${this.id}}#init()`)
const s = this.streams[this.ix]
if (this.paused || (!s && !this.doneCb)) {
this.paused = true
return
} else if (this.doneCb) {
log(` this.doneCb()`)
this.doneCb()
this.doneCb = undefined
return
}
this.cur.on('data', ck => {
log(` s.on('data', ...)`)
s.on('data', ck => {
log(`Bind {id: ${this.id}}#initcur()`)
log(` s.on('data', ...)`)
log(` this.push(${ck})`)
const canPush = this.push(ck)
if (!canPush && this.cur) {
this.cur.pause()
if (!canPush) {
s.pause()
}
})
this.cur.on('end', () => {
this.streams.shift()
if (this.streams.length > 0) {
this.cur = this.streams[0]
this.initcur()
} else {
this.cur = undefined
}
log(` s.once('end', ...)`)
s.once('end', () => {
log(`Bind {id: ${this.id}}#initcur()`)
log(` s.once('end', ...)`)
log(` this.ix++`)
log(` this.init()`)
this.ix++
this.init()
})
}
/** @type {Stream.Duplex['_write']} */
_write(ck, _, cb) {
log(`Bind {id: ${this.id}}#_write(${ck}, _, _)`)
try {
log(` this.streams = ${JSON.stringify(this.streams.map(_ => 'Readable'))}`)
this.streams.push(this.f(ck)())
this.initcur()
if (this.paused) {
log(` this.init()`)
this.paused = false
this.init()
}
log(` cb()`)
cb()
} catch(e) {
log(` cb(${e})`)
// @ts-ignore
cb(e)
}
@ -271,9 +441,9 @@ export class Bind extends Stream.Duplex {
/** @type {Stream.Duplex['_read']} */
_read() {
if (this.cur && this.cur.isPaused()) {
this.cur.resume()
}
log(`Bind {id: ${this.id}}#_read()`)
this.streams.forEach(s =>
s.isPaused() ? s.resume() : undefined)
}
}

View File

@ -43,7 +43,7 @@ instance Apply (ObjectStream i) where
apply (ObjectStream iab) (ObjectStream ia) = wrap $ join $ pure applyImpl <*> iab <*> ia
instance Applicative (ObjectStream i) where
pure = wrap <<< constImpl
pure = once
instance Monad (ObjectStream i)
@ -67,8 +67,8 @@ instance Profunctor ObjectStream where
sac <- pipeImpl sab sbc
pipeImpl sac scd
-- | A stream that will emit the value `a` exactly once.
once :: forall a. a -> ObjectStream Unit a
-- | A stream that will emit the value `a` exactly once, and ignores any written chunks.
once :: forall i a. a -> ObjectStream i a
once = wrap <<< onceImpl
-- | A stream that will immediately emit `close` and `end` events.
@ -141,9 +141,6 @@ run (ObjectStream s') = do
cancelError
cancelEnd
-- | Constructs a `Transform` stream that always invokes the callback with the provided value.
foreign import constImpl :: forall i a. a -> Effect (Stream i a)
-- | Constructs a Stream that re-emits the outputs from each stream, in order.
foreign import chainImpl :: forall a. Array (Stream Unit a) -> Effect (Stream Unit a)

View File

@ -45,6 +45,11 @@ spec =
it "creates a readable that emits each element" do
out <- Stream.run (Stream.fromFoldable [ 1, 2, 3 ])
out `shouldEqual` [ 1, 2, 3 ]
it "bind maps each number" do
out <- Stream.run do
a <- Stream.fromFoldable [ 1, 2, 3 ]
pure $ a + 1
out `shouldEqual` [ 2, 3, 4 ]
it "bind fans out" do
out <- Stream.run do
a <- Stream.fromFoldable [ 1, 2, 3 ]