Compare commits
3 Commits
main
...
wip-object
Author | SHA1 | Date | |
---|---|---|---|
bbbd84c640 | |||
ad04aab031 | |||
f3e0a6095e |
7
index.js.map
Normal file
7
index.js.map
Normal file
File diff suppressed because one or more lines are too long
@ -1,6 +1,7 @@
|
||||
{
|
||||
"name": "purescript-csv-stream",
|
||||
"version": "v1.2.17",
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"csv-parse": "^5.5.5",
|
||||
"csv-stringify": "^6.4.6"
|
||||
|
127
spago.lock
127
spago.lock
@ -4,6 +4,7 @@ workspace:
|
||||
path: ./
|
||||
dependencies:
|
||||
- aff: ">=7.1.0 <8.0.0"
|
||||
- aff-promise
|
||||
- arrays: ">=7.3.0 <8.0.0"
|
||||
- bifunctors: ">=6.0.0 <7.0.0"
|
||||
- control: ">=6.0.0 <7.0.0"
|
||||
@ -27,6 +28,7 @@ workspace:
|
||||
- ordered-collections: ">=3.2.0 <4.0.0"
|
||||
- precise-datetime: ">=7.0.0 <8.0.0"
|
||||
- prelude: ">=6.0.1 <7.0.0"
|
||||
- profunctor
|
||||
- record: ">=4.0.0 <5.0.0"
|
||||
- record-extra: ">=5.0.1 <6.0.0"
|
||||
- st: ">=6.2.0 <7.0.0"
|
||||
@ -37,11 +39,16 @@ workspace:
|
||||
- unsafe-coerce: ">=6.0.0 <7.0.0"
|
||||
test_dependencies:
|
||||
- console
|
||||
- spec
|
||||
build_plan:
|
||||
- aff
|
||||
- aff-promise
|
||||
- ansi
|
||||
- arraybuffer-types
|
||||
- arrays
|
||||
- avar
|
||||
- bifunctors
|
||||
- catenable-lists
|
||||
- console
|
||||
- const
|
||||
- contravariant
|
||||
@ -59,7 +66,9 @@ workspace:
|
||||
- foldable-traversable
|
||||
- foreign
|
||||
- foreign-object
|
||||
- fork
|
||||
- formatters
|
||||
- free
|
||||
- functions
|
||||
- functors
|
||||
- gen
|
||||
@ -70,6 +79,7 @@ workspace:
|
||||
- lazy
|
||||
- lists
|
||||
- maybe
|
||||
- mmorph
|
||||
- newtype
|
||||
- node-buffer
|
||||
- node-event-emitter
|
||||
@ -83,6 +93,7 @@ workspace:
|
||||
- parallel
|
||||
- parsing
|
||||
- partial
|
||||
- pipes
|
||||
- precise-datetime
|
||||
- prelude
|
||||
- profunctor
|
||||
@ -90,6 +101,7 @@ workspace:
|
||||
- record-extra
|
||||
- refs
|
||||
- safe-coerce
|
||||
- spec
|
||||
- st
|
||||
- strings
|
||||
- tailrec
|
||||
@ -124,6 +136,21 @@ packages:
|
||||
- tailrec
|
||||
- transformers
|
||||
- unsafe-coerce
|
||||
aff-promise:
|
||||
type: registry
|
||||
version: 4.0.0
|
||||
integrity: sha256-Kq5EupbUpXeUXx4JqGQE7/RTTz/H6idzWhsocwlEFhM=
|
||||
dependencies:
|
||||
- aff
|
||||
- foreign
|
||||
ansi:
|
||||
type: registry
|
||||
version: 7.0.0
|
||||
integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE=
|
||||
dependencies:
|
||||
- foldable-traversable
|
||||
- lists
|
||||
- strings
|
||||
arraybuffer-types:
|
||||
type: registry
|
||||
version: 3.0.2
|
||||
@ -148,6 +175,17 @@ packages:
|
||||
- tuples
|
||||
- unfoldable
|
||||
- unsafe-coerce
|
||||
avar:
|
||||
type: registry
|
||||
version: 5.0.0
|
||||
integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A=
|
||||
dependencies:
|
||||
- aff
|
||||
- effect
|
||||
- either
|
||||
- exceptions
|
||||
- functions
|
||||
- maybe
|
||||
bifunctors:
|
||||
type: registry
|
||||
version: 6.0.0
|
||||
@ -158,6 +196,18 @@ packages:
|
||||
- newtype
|
||||
- prelude
|
||||
- tuples
|
||||
catenable-lists:
|
||||
type: registry
|
||||
version: 7.0.0
|
||||
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
|
||||
dependencies:
|
||||
- control
|
||||
- foldable-traversable
|
||||
- lists
|
||||
- maybe
|
||||
- prelude
|
||||
- tuples
|
||||
- unfoldable
|
||||
console:
|
||||
type: registry
|
||||
version: 6.1.0
|
||||
@ -339,6 +389,12 @@ packages:
|
||||
- tuples
|
||||
- typelevel-prelude
|
||||
- unfoldable
|
||||
fork:
|
||||
type: registry
|
||||
version: 6.0.0
|
||||
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
|
||||
dependencies:
|
||||
- aff
|
||||
formatters:
|
||||
type: registry
|
||||
version: 7.0.0
|
||||
@ -351,6 +407,25 @@ packages:
|
||||
- parsing
|
||||
- prelude
|
||||
- transformers
|
||||
free:
|
||||
type: registry
|
||||
version: 7.1.0
|
||||
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
|
||||
dependencies:
|
||||
- catenable-lists
|
||||
- control
|
||||
- distributive
|
||||
- either
|
||||
- exists
|
||||
- foldable-traversable
|
||||
- invariant
|
||||
- lazy
|
||||
- maybe
|
||||
- prelude
|
||||
- tailrec
|
||||
- transformers
|
||||
- tuples
|
||||
- unsafe-coerce
|
||||
functions:
|
||||
type: registry
|
||||
version: 6.0.0
|
||||
@ -460,6 +535,14 @@ packages:
|
||||
- invariant
|
||||
- newtype
|
||||
- prelude
|
||||
mmorph:
|
||||
type: registry
|
||||
version: 7.0.0
|
||||
integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ=
|
||||
dependencies:
|
||||
- free
|
||||
- functors
|
||||
- transformers
|
||||
newtype:
|
||||
type: registry
|
||||
version: 5.0.0
|
||||
@ -610,6 +693,18 @@ packages:
|
||||
version: 4.0.0
|
||||
integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4=
|
||||
dependencies: []
|
||||
pipes:
|
||||
type: registry
|
||||
version: 8.0.0
|
||||
integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY=
|
||||
dependencies:
|
||||
- aff
|
||||
- lists
|
||||
- mmorph
|
||||
- prelude
|
||||
- tailrec
|
||||
- transformers
|
||||
- tuples
|
||||
precise-datetime:
|
||||
type: registry
|
||||
version: 7.0.0
|
||||
@ -683,6 +778,38 @@ packages:
|
||||
integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU=
|
||||
dependencies:
|
||||
- unsafe-coerce
|
||||
spec:
|
||||
type: registry
|
||||
version: 7.6.0
|
||||
integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I=
|
||||
dependencies:
|
||||
- aff
|
||||
- ansi
|
||||
- arrays
|
||||
- avar
|
||||
- bifunctors
|
||||
- control
|
||||
- datetime
|
||||
- effect
|
||||
- either
|
||||
- exceptions
|
||||
- foldable-traversable
|
||||
- fork
|
||||
- identity
|
||||
- integers
|
||||
- lists
|
||||
- maybe
|
||||
- newtype
|
||||
- now
|
||||
- ordered-collections
|
||||
- parallel
|
||||
- pipes
|
||||
- prelude
|
||||
- refs
|
||||
- strings
|
||||
- tailrec
|
||||
- transformers
|
||||
- tuples
|
||||
st:
|
||||
type: registry
|
||||
version: 6.2.0
|
||||
|
@ -10,6 +10,8 @@ package:
|
||||
strict: true
|
||||
pedanticPackages: true
|
||||
dependencies:
|
||||
- aff-promise
|
||||
- profunctor
|
||||
- aff: ">=7.1.0 <8.0.0"
|
||||
- arrays: ">=7.3.0 <8.0.0"
|
||||
- bifunctors: ">=6.0.0 <7.0.0"
|
||||
@ -46,5 +48,6 @@ package:
|
||||
main: Test.Main
|
||||
dependencies:
|
||||
- console
|
||||
- spec
|
||||
workspace:
|
||||
extraPackages: {}
|
||||
|
535
src/Node.Stream.Object.js
Normal file
535
src/Node.Stream.Object.js
Normal file
@ -0,0 +1,535 @@
|
||||
import Stream from "stream";
|
||||
|
||||
const DEBUG = process.env['NODEJS_OBJECT_STREAM_TRACE'] !== ''
|
||||
|
||||
/** @type {(s: string) => void} */
|
||||
const log = m => DEBUG ? console.log(m) : undefined;
|
||||
|
||||
let chainCount = 0
|
||||
let composeCount = 0
|
||||
let neverCount = 0
|
||||
let onceCount = 0
|
||||
let bindCount = 0
|
||||
let zipCount = 0
|
||||
let mapCount = 0
|
||||
let constCount = 0
|
||||
let fromPromiseCount = 0
|
||||
|
||||
export class Never extends Stream.Readable {
|
||||
constructor() {
|
||||
super({objectMode: true})
|
||||
this.id = neverCount++
|
||||
}
|
||||
|
||||
_read() {
|
||||
log(`Never {id: ${this.id}}#_read()`)
|
||||
log(` this.push(null)`)
|
||||
this.push(null)
|
||||
}
|
||||
}
|
||||
|
||||
/** @template T */
|
||||
export class Once extends Stream.Duplex {
|
||||
/** @param {T} a */
|
||||
constructor(a) {
|
||||
super({objectMode: true, allowHalfOpen: false})
|
||||
this.a = a
|
||||
this.id = onceCount++
|
||||
this.push(this.a)
|
||||
this.push(null)
|
||||
log(`Once {id: ${this.id}}#new()`)
|
||||
log(` this.push(${a})`)
|
||||
log(` this.push(null)`)
|
||||
}
|
||||
|
||||
/** @type {Stream.Duplex['_write']} */
|
||||
_write(_ck, _enc, cb) {
|
||||
cb()
|
||||
}
|
||||
|
||||
_read() {
|
||||
log(`Once {id: ${this.id}}#_read()`)
|
||||
}
|
||||
}
|
||||
|
||||
/** @template T */
|
||||
export class Const extends Stream.Transform {
|
||||
/** @param {T} a */
|
||||
constructor(a) {
|
||||
super({objectMode: true})
|
||||
this.a = a
|
||||
this.id = constCount++
|
||||
}
|
||||
|
||||
/** @type {Stream.Transform['_transform']} */
|
||||
_transform(_c, _enc, cb) {
|
||||
log(`Const {id: ${this.id}}#_transform(${_c}, _, _)`)
|
||||
log(` cb(${this.a})`)
|
||||
this.push(this.a)
|
||||
cb()
|
||||
}
|
||||
}
|
||||
|
||||
/** @template T */
|
||||
export class FromPromise extends Stream.Readable {
|
||||
/** @param {Promise<T>} p */
|
||||
constructor(p) {
|
||||
super({objectMode: true})
|
||||
this.id = fromPromiseCount++
|
||||
p
|
||||
.then(a => {
|
||||
log(`FromPromise {id: ${this.id}}#new()`)
|
||||
log(` ...p.then(...)`)
|
||||
log(` this.push(${a})`)
|
||||
log(` this.push(null)`)
|
||||
this.push(a)
|
||||
this.push(null)
|
||||
})
|
||||
.catch(e => {
|
||||
log(`FromPromise {id: ${this.id}}#new()`)
|
||||
log(` ...p.catch(...)`)
|
||||
log(` this.destroy(${e})`)
|
||||
this.destroy(e)
|
||||
})
|
||||
}
|
||||
|
||||
_read() {
|
||||
log(`FromPromise {id: ${this.id}}#_read()`)
|
||||
}
|
||||
}
|
||||
|
||||
/** @template T */
|
||||
export class Chain extends Stream.Readable {
|
||||
/** @param {...Stream.Readable} streams */
|
||||
constructor(...streams) {
|
||||
super({objectMode: true})
|
||||
this.id = chainCount++
|
||||
this.ix = -1
|
||||
this.streams = streams
|
||||
this.next()
|
||||
}
|
||||
|
||||
next() {
|
||||
log(`Chain {id: ${this.id}}#next()`)
|
||||
this.ix++
|
||||
if (this.ix === this.streams.length) {
|
||||
log(` this.push(null)`)
|
||||
this.push(null)
|
||||
} else {
|
||||
const cur = this.streams[this.ix]
|
||||
|
||||
cur.once('error', e => {
|
||||
log(`Chain {id: ${this.id}}#next()`)
|
||||
log(` cur.once('error', ...)`)
|
||||
log(` this.destroy(${e})`)
|
||||
this.destroy(e)
|
||||
})
|
||||
|
||||
cur.once('end', () => {
|
||||
log(`Chain {id: ${this.id}}#next()`)
|
||||
log(` cur.once('end', ...)`)
|
||||
log(` this.next()`)
|
||||
this.next()
|
||||
})
|
||||
|
||||
cur.on('data', ck => {
|
||||
log(`Chain {id: ${this.id}}#next()`)
|
||||
log(` cur.on('data', ...)`)
|
||||
log(` this.push(${ck})`)
|
||||
const canPush = this.push(ck)
|
||||
if (cur && !canPush) {
|
||||
log(` cur.pause()`)
|
||||
cur.pause()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
_read() {
|
||||
log(`Chain {id: ${this.id}}#_read()`)
|
||||
this.streams.forEach(s => {
|
||||
if (s.isPaused()) {
|
||||
log(` s.resume()`)
|
||||
s.resume()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @template T
|
||||
* @template R
|
||||
*/
|
||||
export class Map extends Stream.Transform {
|
||||
/** @param {(t: T) => R} f */
|
||||
constructor(f) {
|
||||
super({objectMode: true})
|
||||
this.f = f
|
||||
this.id = mapCount++
|
||||
}
|
||||
|
||||
/** @type {Stream.Transform['_transform']} */
|
||||
_transform(ck, _, cb) {
|
||||
log(`Map {id: ${this.id}}#_transform(${ck}, _, _)`)
|
||||
const r = this.f(ck)
|
||||
log(` const r = (${r})`)
|
||||
log(` cb(null, ${r})`)
|
||||
cb(null, r)
|
||||
}
|
||||
}
|
||||
|
||||
export class Zip extends Stream.Readable {
|
||||
/** @type {Array<Stream.Readable>} */
|
||||
streams = []
|
||||
|
||||
/** @type {Array<Array<unknown | null>>} */
|
||||
buf = []
|
||||
|
||||
/** @type {Timer | null} */
|
||||
readingTimer = null
|
||||
/** @type {Timer | null} */
|
||||
pushingTimer = null
|
||||
|
||||
/** @param {...Stream.Readable} streams */
|
||||
constructor(...streams) {
|
||||
super({objectMode: true})
|
||||
this.id = zipCount++
|
||||
this.streams = streams
|
||||
this.streams.forEach((s, ix) => {
|
||||
s.once('error', e => this.destroy(e))
|
||||
s.once('end', () => this.push(null))
|
||||
s.on('data', ck => {
|
||||
this.bufput(ix, ck)
|
||||
s.pause()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/** @type {(ix: number, val: unknown) => void} */
|
||||
bufput(ix, val) {
|
||||
if (this.buflastdone()) {
|
||||
this.bufnew()
|
||||
}
|
||||
const zipped = this.buflast()
|
||||
zipped[ix] = val
|
||||
}
|
||||
|
||||
bufnew() {
|
||||
this.buf.push(this.streams.map(_ => null))
|
||||
}
|
||||
|
||||
buflastdone() {
|
||||
const l = this.buflast()
|
||||
return l.every(a => a !== null)
|
||||
}
|
||||
|
||||
bufheaddone() {
|
||||
const l = this.bufhead()
|
||||
return l.every(a => a !== null)
|
||||
}
|
||||
|
||||
/** @returns {unknown[]} */
|
||||
buflast() {
|
||||
if (this.buf.length === 0) {
|
||||
this.bufnew()
|
||||
}
|
||||
return this.buf[this.buf.length - 1]
|
||||
}
|
||||
|
||||
/** @returns {unknown[]} */
|
||||
bufhead() {
|
||||
if (this.buf.length === 0) {
|
||||
this.bufnew()
|
||||
}
|
||||
return this.buf[0]
|
||||
}
|
||||
|
||||
anyended() {
|
||||
return this.streams.some(s => s.readableEnded)
|
||||
}
|
||||
|
||||
maybeend() {
|
||||
if (!this.bufheaddone() && this.anyended()) {
|
||||
this.push(null)
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
readzipped() {
|
||||
if (this.anyended()) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (this.buflastdone()) {
|
||||
this.bufnew()
|
||||
this.streams.forEach((s, ix) => {
|
||||
this.buflast()[ix] = s.read()
|
||||
})
|
||||
return this.buflastdone()
|
||||
} else {
|
||||
const missingIxs = this.buflast().flatMap((a, ix) => a === null ? [ix] : [])
|
||||
missingIxs.forEach(ix => {
|
||||
this.buflast()[ix] = this.streams[ix].read()
|
||||
})
|
||||
return this.buflastdone()
|
||||
}
|
||||
}
|
||||
|
||||
_read() {
|
||||
if (this.readingTimer) clearTimeout(this.readingTimer);
|
||||
if (this.pushingTimer) clearTimeout(this.pushingTimer);
|
||||
|
||||
this.readingTimer = setTimeout(() => {
|
||||
while (this.readzipped()) {
|
||||
continue;
|
||||
}
|
||||
}, 0)
|
||||
|
||||
this.pushingTimer = setTimeout(() => {
|
||||
let canWrite = true
|
||||
|
||||
if (this.maybeend()) {
|
||||
return
|
||||
}
|
||||
|
||||
while (canWrite && this.bufheaddone()) {
|
||||
canWrite = this.push(this.bufhead())
|
||||
this.buf.shift()
|
||||
}
|
||||
|
||||
this.maybeend()
|
||||
}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
export class Compose extends Stream.Duplex {
|
||||
/**
|
||||
* @param {Stream.Readable | Stream.Transform} a
|
||||
* @param {Stream.Transform} b
|
||||
*/
|
||||
constructor(a, b) {
|
||||
super({objectMode: true})
|
||||
this.id = composeCount++
|
||||
|
||||
this.a = a
|
||||
this.b = b
|
||||
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` a.on('data', ...)`)
|
||||
log(` a.once('end', ...)`)
|
||||
log(` a.once('error', ...)`)
|
||||
log(` a.pause()`)
|
||||
log(` b.on('drain', ...)`)
|
||||
log(` b.on('data', ...)`)
|
||||
log(` b.on('error', ...)`)
|
||||
log(` b.on('finish', ...)`)
|
||||
|
||||
this.a.once('end', () => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` a.on('end', ...)`)
|
||||
log(` b.end()`)
|
||||
this.b.end()
|
||||
})
|
||||
|
||||
this.a.on('data', ck => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` a.on('data', ...)`)
|
||||
log(` b.write(${ck})`)
|
||||
const canWrite = this.b.write(ck)
|
||||
|
||||
if (!canWrite) {
|
||||
log(` a.pause()`)
|
||||
this.a.pause()
|
||||
}
|
||||
})
|
||||
|
||||
this.a.once('error', e => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` a.once('error', ...)`)
|
||||
log(` this.destroy(${e})`)
|
||||
this.destroy(e)
|
||||
this.b.destroy(e)
|
||||
})
|
||||
|
||||
this.b.on('drain', () => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` b.on('drain', ...)`)
|
||||
log(` this.a.resume()`)
|
||||
this.a.resume()
|
||||
})
|
||||
|
||||
this.b.on('data', ck => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` b.on('data', ...)`)
|
||||
log(` this.push(${ck})`)
|
||||
const canPush = this.push(ck)
|
||||
if (!canPush) {
|
||||
log(` b.pause()`)
|
||||
this.b.pause()
|
||||
}
|
||||
})
|
||||
|
||||
this.b.once('error', e => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` b.once('error', ...)`)
|
||||
log(` this.destroy(${e})`)
|
||||
this.destroy(e)
|
||||
this.a.destroy(e)
|
||||
})
|
||||
|
||||
this.b.once('finish', () => {
|
||||
log(`Compose {id: ${this.id}}#new()`)
|
||||
log(` b.once('finish', ...)`)
|
||||
log(` this.emit('finish')`)
|
||||
this.push(null)
|
||||
this.end()
|
||||
this.emit('finish')
|
||||
})
|
||||
}
|
||||
|
||||
_read() {
|
||||
log(`Compose {id: ${this.id}}#_read()`)
|
||||
if (this.b.isPaused()) {
|
||||
log(` b.resume()`)
|
||||
this.b.resume()
|
||||
}
|
||||
}
|
||||
|
||||
/** @type {Stream.Duplex['_write']} */
|
||||
_write(ck, _enc, cb) {
|
||||
log(`Compose {id: ${this.id}}#_write(${ck}, _, _)`)
|
||||
if (this.a instanceof Stream.Readable) {
|
||||
throw new Error('Cannot `write` to a Readable stream')
|
||||
}
|
||||
|
||||
log(` this.a.write(${ck}, _, _)`)
|
||||
this.a.write(ck, _enc, cb)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @template T
|
||||
*/
|
||||
export class Bind extends Stream.Duplex {
|
||||
/**
|
||||
* @param {(t: T) => () => Stream.Readable} f
|
||||
*/
|
||||
constructor(f) {
|
||||
super({objectMode: true, allowHalfOpen: true})
|
||||
this.f = f
|
||||
this.id = bindCount++
|
||||
|
||||
this.ix = 0
|
||||
|
||||
/** @type {Array<Stream.Readable>} */
|
||||
this.streams = []
|
||||
|
||||
/** @type {(() => void) | undefined} */
|
||||
this.doneCb = undefined
|
||||
|
||||
this.paused = true
|
||||
}
|
||||
|
||||
/** @type {NonNullable<Stream.Duplex['_final']>} */
|
||||
_final(cb) {
|
||||
log(`Bind {id: ${this.id}}#_final(_)`)
|
||||
this.doneCb = cb
|
||||
}
|
||||
|
||||
init() {
|
||||
log(`Bind {id: ${this.id}}#init()`)
|
||||
|
||||
const s = this.streams[this.ix]
|
||||
if (this.paused || (!s && !this.doneCb)) {
|
||||
this.paused = true
|
||||
return
|
||||
} else if (this.doneCb) {
|
||||
log(` this.doneCb()`)
|
||||
this.doneCb()
|
||||
this.doneCb = undefined
|
||||
return
|
||||
}
|
||||
|
||||
log(` s.on('data', ...)`)
|
||||
s.on('data', ck => {
|
||||
log(`Bind {id: ${this.id}}#initcur()`)
|
||||
log(` s.on('data', ...)`)
|
||||
log(` this.push(${ck})`)
|
||||
const canPush = this.push(ck)
|
||||
if (!canPush) {
|
||||
s.pause()
|
||||
}
|
||||
})
|
||||
|
||||
log(` s.once('end', ...)`)
|
||||
s.once('end', () => {
|
||||
log(`Bind {id: ${this.id}}#initcur()`)
|
||||
log(` s.once('end', ...)`)
|
||||
log(` this.ix++`)
|
||||
log(` this.init()`)
|
||||
this.ix++
|
||||
this.init()
|
||||
})
|
||||
}
|
||||
|
||||
/** @type {Stream.Duplex['_write']} */
|
||||
_write(ck, _, cb) {
|
||||
log(`Bind {id: ${this.id}}#_write(${ck}, _, _)`)
|
||||
try {
|
||||
log(` this.streams = ${JSON.stringify(this.streams.map(_ => 'Readable'))}`)
|
||||
this.streams.push(this.f(ck)())
|
||||
if (this.paused) {
|
||||
log(` this.init()`)
|
||||
this.paused = false
|
||||
this.init()
|
||||
}
|
||||
log(` cb()`)
|
||||
cb()
|
||||
} catch(e) {
|
||||
log(` cb(${e})`)
|
||||
// @ts-ignore
|
||||
cb(e)
|
||||
}
|
||||
}
|
||||
|
||||
/** @type {Stream.Duplex['_read']} */
|
||||
_read() {
|
||||
log(`Bind {id: ${this.id}}#_read()`)
|
||||
this.streams.forEach(s =>
|
||||
s.isPaused() ? s.resume() : undefined)
|
||||
}
|
||||
}
|
||||
|
||||
/** @type {() => Stream.Readable} */
|
||||
export const neverImpl = () => new Never();
|
||||
|
||||
/** @type {<T>(a: T) => () => Stream.Transform} */
|
||||
export const constImpl = (a) => () => new Const(a);
|
||||
|
||||
/** @type {<T>(a: T) => () => Stream.Readable} */
|
||||
export const onceImpl = (a) => () => new Once(a);
|
||||
|
||||
/** @type {<T>(a: () => Promise<T>) => () => Stream.Readable} */
|
||||
export const fromPromiseImpl = (a) => () => new FromPromise(a())
|
||||
|
||||
/** @type {(ss: Array<Stream.Readable>) => () => Stream.Readable} */
|
||||
export const chainImpl = (ss) => () => new Chain(...ss);
|
||||
|
||||
/** @type {<T, R>(f: (t: T) => R) => () => Stream.Transform} */
|
||||
export const mapImpl = (f) => () => new Map(f);
|
||||
|
||||
/** @type {(iab: Stream.Readable) => (ia: Stream.Readable) => () => Stream.Readable} */
|
||||
export const applyImpl = (iab) => (ia) => () =>
|
||||
new Compose(new Zip(iab, ia), new Map(([ab, a]) => ab(a)))
|
||||
|
||||
/** @type {<T>(f: (t: T) => () => Stream.Readable) => () => Stream.Duplex} */
|
||||
export const bindImpl = (f) => () => new Bind(f)
|
||||
|
||||
/** @type {(a: Stream.Transform | Stream.Readable) => (b: Stream.Transform) => () => Stream.Duplex} */
|
||||
export const pipeImpl = (a) => (b) => () => new Compose(a, b)
|
||||
|
||||
process.on('beforeExit', () => {
|
||||
debugger;
|
||||
})
|
189
src/Node.Stream.Object.purs
Normal file
189
src/Node.Stream.Object.purs
Normal file
@ -0,0 +1,189 @@
|
||||
module Node.Stream.Object where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Control.Monad.Rec.Class (untilJust)
|
||||
import Control.Monad.ST.Global as ST
|
||||
import Control.Monad.ST.Ref as STRef
|
||||
import Control.Promise (Promise)
|
||||
import Control.Promise as Promise
|
||||
import Data.Array as Array
|
||||
import Data.Array.ST as Array.ST
|
||||
import Data.Either (Either(..))
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Data.Newtype (class Newtype, unwrap, wrap)
|
||||
import Data.Profunctor (class Profunctor)
|
||||
import Data.Traversable (class Traversable, traverse)
|
||||
import Effect (Effect)
|
||||
import Effect.Aff (Aff, delay, effectCanceler, launchAff_, makeAff)
|
||||
import Effect.Aff.Class (class MonadAff)
|
||||
import Effect.Class (class MonadEffect, liftEffect)
|
||||
import Effect.Exception (Error)
|
||||
import Effect.Uncurried (mkEffectFn1)
|
||||
import Node.Buffer (Buffer)
|
||||
import Node.EventEmitter (EventHandle(..))
|
||||
import Node.EventEmitter as Event
|
||||
import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0)
|
||||
import Node.Stream (Readable, Writable)
|
||||
import Unsafe.Coerce (unsafeCoerce)
|
||||
|
||||
foreign import data Stream :: Type -> Type -> Type
|
||||
|
||||
newtype ObjectStream :: Type -> Type -> Type
|
||||
newtype ObjectStream i o = ObjectStream (Effect (Stream i o))
|
||||
|
||||
derive instance Newtype (ObjectStream i o) _
|
||||
|
||||
instance Functor (ObjectStream i) where
|
||||
map :: forall a b. (a -> b) -> ObjectStream i a -> ObjectStream i b
|
||||
map ab (ObjectStream ia) = wrap $ join $ pure pipeImpl <*> ia <*> mapImpl ab
|
||||
|
||||
instance Apply (ObjectStream i) where
|
||||
apply :: forall a b. ObjectStream i (a -> b) -> ObjectStream i a -> ObjectStream i b
|
||||
apply (ObjectStream iab) (ObjectStream ia) = wrap $ join $ pure applyImpl <*> iab <*> ia
|
||||
|
||||
instance Applicative (ObjectStream i) where
|
||||
pure = once
|
||||
|
||||
instance Monad (ObjectStream i)
|
||||
|
||||
instance MonadEffect (ObjectStream Unit) where
|
||||
liftEffect = wrap <<< flip bind onceImpl
|
||||
|
||||
instance MonadAff (ObjectStream Unit) where
|
||||
liftAff = wrap <<< fromPromiseImpl <<< Promise.fromAff
|
||||
|
||||
instance Bind (ObjectStream i) where
|
||||
bind (ObjectStream sia') asb = wrap do
|
||||
sia <- sia'
|
||||
sab <- bindImpl (unwrap <<< asb)
|
||||
pipeImpl sia sab
|
||||
|
||||
instance Profunctor ObjectStream where
|
||||
dimap ab cd (ObjectStream sbc') = wrap do
|
||||
sbc <- sbc'
|
||||
sab <- mapImpl ab
|
||||
scd <- mapImpl cd
|
||||
sac <- pipeImpl sab sbc
|
||||
pipeImpl sac scd
|
||||
|
||||
-- | A stream that will emit the value `a` exactly once, and ignores any written chunks.
|
||||
once :: forall i a. a -> ObjectStream i a
|
||||
once = wrap <<< onceImpl
|
||||
|
||||
-- | A stream that will immediately emit `close` and `end` events.
|
||||
never :: forall a. ObjectStream Unit a
|
||||
never = wrap neverImpl
|
||||
|
||||
-- | A stream that for all input chunks, emits `unit`.
|
||||
sink :: forall a. ObjectStream a Unit
|
||||
sink = pure unit
|
||||
|
||||
-- | Create a stream from a `Foldable` of `a`s
|
||||
fromFoldable :: forall f a. Traversable f => f a -> ObjectStream Unit a
|
||||
fromFoldable = chainMany <<< map once
|
||||
|
||||
-- | Convert a `Readable` stream emitting `Buffer` chunks to an `ObjectStream`
|
||||
fromBufferReadable :: forall r. Readable r -> ObjectStream Unit Buffer
|
||||
fromBufferReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit Buffer) r
|
||||
|
||||
-- | Convert a `Readable` stream emitting `String` chunks to an `ObjectStream`
|
||||
fromStringReadable :: forall r. Readable r -> ObjectStream Unit String
|
||||
fromStringReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit String) r
|
||||
|
||||
-- | Convert a `Writable` stream accepting `Buffer` chunks to an `ObjectStream`
|
||||
fromBufferWritable :: forall r. Writable r -> ObjectStream Buffer Unit
|
||||
fromBufferWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream Buffer Unit) r
|
||||
|
||||
-- | Convert a `Writable` stream accepting `String` chunks to an `ObjectStream`
|
||||
fromStringWritable :: forall r. Writable r -> ObjectStream String Unit
|
||||
fromStringWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream String Unit) r
|
||||
|
||||
-- | Emit chunks from the first stream, then when exhausted emit chunks from the second.
|
||||
chain :: forall a. ObjectStream Unit a -> ObjectStream Unit a -> ObjectStream Unit a
|
||||
chain a b = chainMany [ a, b ]
|
||||
|
||||
-- | `chain` for an arbitrary number of streams.
|
||||
chainMany :: forall f a. Traversable f => f (ObjectStream Unit a) -> ObjectStream Unit a
|
||||
chainMany as' = wrap do
|
||||
as <- Array.fromFoldable <$> traverse unwrap as'
|
||||
chainImpl as
|
||||
|
||||
run_ :: forall a. ObjectStream Unit a -> Aff Unit
|
||||
run_ = void <<< run
|
||||
|
||||
run :: forall a. ObjectStream Unit a -> Aff (Array a)
|
||||
run (ObjectStream s') = do
|
||||
runningCount <- liftEffect $ ST.toEffect $ STRef.new 0
|
||||
values <- liftEffect $ ST.toEffect $ Array.ST.new
|
||||
s <- liftEffect s'
|
||||
makeAff \res ->
|
||||
let
|
||||
onData a = ST.toEffect do
|
||||
void $ STRef.modify (_ + 1) runningCount
|
||||
void $ Array.ST.push a values
|
||||
void $ STRef.modify (_ - 1) runningCount
|
||||
onError e = res $ Left e
|
||||
onEnd = launchAff_ do
|
||||
untilJust do
|
||||
delay $ wrap $ 1.0
|
||||
running <- liftEffect $ ST.toEffect $ STRef.read runningCount
|
||||
pure $ if running == 0 then Just unit else Nothing
|
||||
values' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze values
|
||||
liftEffect $ res $ Right values'
|
||||
in
|
||||
do
|
||||
cancelData <- Event.on dataH onData s
|
||||
cancelError <- Event.on errorH onError s
|
||||
cancelEnd <- Event.on endH onEnd s
|
||||
pure $ effectCanceler do
|
||||
cancelData
|
||||
cancelError
|
||||
cancelEnd
|
||||
|
||||
-- | Constructs a Stream that re-emits the outputs from each stream, in order.
|
||||
foreign import chainImpl :: forall a. Array (Stream Unit a) -> Effect (Stream Unit a)
|
||||
|
||||
-- | Pipes a stream's output into another's input, returning the new composite stream.
|
||||
-- |
|
||||
-- | Note that this differs from `Readable#pipe`, which returns the destination stream
|
||||
-- | verbatim to allow chained piping of only _outputs_.
|
||||
foreign import pipeImpl :: forall a b c. Stream a b -> Stream b c -> Effect (Stream a c)
|
||||
|
||||
-- | A readable stream that immediately closes without emitting any chunks.
|
||||
foreign import neverImpl :: forall a. Effect (Stream Unit a)
|
||||
|
||||
-- | Constructs a readable stream from an asynchronous value.
|
||||
foreign import fromPromiseImpl :: forall a. Effect (Promise a) -> Effect (Stream Unit a)
|
||||
|
||||
-- | Constructs a readable stream that emits a single value then closes.
|
||||
foreign import onceImpl :: forall i a. a -> Effect (Stream i a)
|
||||
|
||||
-- | Constructs a `Transform` applying the given function to chunks.
|
||||
foreign import mapImpl :: forall a b. (a -> b) -> Effect (Stream a b)
|
||||
|
||||
-- | Constructs a `Transform` zipping functions from the left stream with values from the right stream.
|
||||
-- |
|
||||
-- | Closes when either stream closes.
|
||||
foreign import applyImpl :: forall i a b. Stream i (a -> b) -> Stream i a -> Effect (Stream i b)
|
||||
|
||||
-- | Constructs a `Transform` which applies a function to each written chunk.
|
||||
-- |
|
||||
-- | The values emitted by the stream returned by this function are then emitted
|
||||
-- | until the temporary stream closes.
|
||||
foreign import bindImpl :: forall a _x b. (a -> Effect (Stream _x b)) -> Effect (Stream a b)
|
||||
|
||||
dataH :: forall i o. EventHandle1 (Stream i o) o
|
||||
dataH = EventHandle "data" mkEffectFn1
|
||||
|
||||
readableH :: forall i o. EventHandle0 (Stream i o)
|
||||
readableH = EventHandle "readable" identity
|
||||
|
||||
closeH :: forall i o. EventHandle0 (Stream i o)
|
||||
closeH = EventHandle "close" identity
|
||||
|
||||
endH :: forall i o. EventHandle0 (Stream i o)
|
||||
endH = EventHandle "end" identity
|
||||
|
||||
errorH :: forall i o. EventHandle1 (Stream i o) Error
|
||||
errorH = EventHandle "error" mkEffectFn1
|
@ -2,11 +2,13 @@ module Test.Main where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Data.Maybe (Maybe(..))
|
||||
import Effect (Effect)
|
||||
import Effect.Class.Console (log)
|
||||
import Effect.Aff (launchAff_)
|
||||
import Test.Node.Stream.Object as Test.Node.Stream.Object
|
||||
import Test.Spec.Reporter (consoleReporter, specReporter)
|
||||
import Test.Spec.Runner (defaultConfig, runSpec')
|
||||
|
||||
main :: Effect Unit
|
||||
main = do
|
||||
log "🍕"
|
||||
log "You should add some tests."
|
||||
|
||||
main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ consoleReporter ] do
|
||||
Test.Node.Stream.Object.spec
|
||||
|
57
test/Test/Node.Stream.Object.purs
Normal file
57
test/Test/Node.Stream.Object.purs
Normal file
@ -0,0 +1,57 @@
|
||||
module Test.Node.Stream.Object where
|
||||
|
||||
import Prelude
|
||||
|
||||
import Data.Newtype (wrap)
|
||||
import Effect.Aff (delay)
|
||||
import Effect.Aff.Class (liftAff)
|
||||
import Node.Stream.Object as Stream
|
||||
import Test.Spec (Spec, describe, it)
|
||||
import Test.Spec.Assertions (shouldEqual)
|
||||
|
||||
spec :: Spec Unit
|
||||
spec =
|
||||
describe "Node.Stream.Object" do
|
||||
describe "ObjectStream" do
|
||||
describe "once" do
|
||||
it "emits once" do
|
||||
out <- Stream.run (Stream.once 1)
|
||||
out `shouldEqual` [ 1 ]
|
||||
describe "never" do
|
||||
it "immediately closes" do
|
||||
out <- Stream.run Stream.never
|
||||
out `shouldEqual` ([] :: Array Int)
|
||||
describe "chain" do
|
||||
it "noops" do
|
||||
out <- Stream.run $ Stream.chainMany []
|
||||
out `shouldEqual` ([] :: Array Int)
|
||||
it "works with 1 stream" do
|
||||
out <- Stream.run $ Stream.chainMany [Stream.once 1]
|
||||
out `shouldEqual` [1]
|
||||
it "works with 2 streams" do
|
||||
out <- Stream.run $ Stream.chainMany [Stream.once 1, Stream.once 2]
|
||||
out `shouldEqual` [1, 2]
|
||||
it "does not emit end until last child stream ends" do
|
||||
let
|
||||
delayed n a = liftAff do
|
||||
delay $ wrap n
|
||||
pure a
|
||||
out <- Stream.run $ Stream.chainMany [delayed 10.0 1, delayed 20.0 2]
|
||||
out `shouldEqual` [1, 2]
|
||||
describe "fromFoldable" do
|
||||
it "creates an empty readable" do
|
||||
out <- Stream.run (Stream.fromFoldable [] :: Stream.ObjectStream Unit (Array Int))
|
||||
out `shouldEqual` []
|
||||
it "creates a readable that emits each element" do
|
||||
out <- Stream.run (Stream.fromFoldable [ 1, 2, 3 ])
|
||||
out `shouldEqual` [ 1, 2, 3 ]
|
||||
it "bind maps each number" do
|
||||
out <- Stream.run do
|
||||
a <- Stream.fromFoldable [ 1, 2, 3 ]
|
||||
pure $ a + 1
|
||||
out `shouldEqual` [ 2, 3, 4 ]
|
||||
it "bind fans out" do
|
||||
out <- Stream.run do
|
||||
a <- Stream.fromFoldable [ 1, 2, 3 ]
|
||||
Stream.fromFoldable [a * 10, a * 20]
|
||||
out `shouldEqual` [ 10, 20, 20, 40, 30, 60 ]
|
Loading…
Reference in New Issue
Block a user