Compare commits
3 Commits
main
...
wip-object
Author | SHA1 | Date | |
---|---|---|---|
bbbd84c640 | |||
ad04aab031 | |||
f3e0a6095e |
7
index.js.map
Normal file
7
index.js.map
Normal file
File diff suppressed because one or more lines are too long
@ -1,6 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "purescript-csv-stream",
|
"name": "purescript-csv-stream",
|
||||||
"version": "v1.2.17",
|
"version": "v1.2.17",
|
||||||
|
"type": "module",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"csv-parse": "^5.5.5",
|
"csv-parse": "^5.5.5",
|
||||||
"csv-stringify": "^6.4.6"
|
"csv-stringify": "^6.4.6"
|
||||||
|
127
spago.lock
127
spago.lock
@ -4,6 +4,7 @@ workspace:
|
|||||||
path: ./
|
path: ./
|
||||||
dependencies:
|
dependencies:
|
||||||
- aff: ">=7.1.0 <8.0.0"
|
- aff: ">=7.1.0 <8.0.0"
|
||||||
|
- aff-promise
|
||||||
- arrays: ">=7.3.0 <8.0.0"
|
- arrays: ">=7.3.0 <8.0.0"
|
||||||
- bifunctors: ">=6.0.0 <7.0.0"
|
- bifunctors: ">=6.0.0 <7.0.0"
|
||||||
- control: ">=6.0.0 <7.0.0"
|
- control: ">=6.0.0 <7.0.0"
|
||||||
@ -27,6 +28,7 @@ workspace:
|
|||||||
- ordered-collections: ">=3.2.0 <4.0.0"
|
- ordered-collections: ">=3.2.0 <4.0.0"
|
||||||
- precise-datetime: ">=7.0.0 <8.0.0"
|
- precise-datetime: ">=7.0.0 <8.0.0"
|
||||||
- prelude: ">=6.0.1 <7.0.0"
|
- prelude: ">=6.0.1 <7.0.0"
|
||||||
|
- profunctor
|
||||||
- record: ">=4.0.0 <5.0.0"
|
- record: ">=4.0.0 <5.0.0"
|
||||||
- record-extra: ">=5.0.1 <6.0.0"
|
- record-extra: ">=5.0.1 <6.0.0"
|
||||||
- st: ">=6.2.0 <7.0.0"
|
- st: ">=6.2.0 <7.0.0"
|
||||||
@ -37,11 +39,16 @@ workspace:
|
|||||||
- unsafe-coerce: ">=6.0.0 <7.0.0"
|
- unsafe-coerce: ">=6.0.0 <7.0.0"
|
||||||
test_dependencies:
|
test_dependencies:
|
||||||
- console
|
- console
|
||||||
|
- spec
|
||||||
build_plan:
|
build_plan:
|
||||||
- aff
|
- aff
|
||||||
|
- aff-promise
|
||||||
|
- ansi
|
||||||
- arraybuffer-types
|
- arraybuffer-types
|
||||||
- arrays
|
- arrays
|
||||||
|
- avar
|
||||||
- bifunctors
|
- bifunctors
|
||||||
|
- catenable-lists
|
||||||
- console
|
- console
|
||||||
- const
|
- const
|
||||||
- contravariant
|
- contravariant
|
||||||
@ -59,7 +66,9 @@ workspace:
|
|||||||
- foldable-traversable
|
- foldable-traversable
|
||||||
- foreign
|
- foreign
|
||||||
- foreign-object
|
- foreign-object
|
||||||
|
- fork
|
||||||
- formatters
|
- formatters
|
||||||
|
- free
|
||||||
- functions
|
- functions
|
||||||
- functors
|
- functors
|
||||||
- gen
|
- gen
|
||||||
@ -70,6 +79,7 @@ workspace:
|
|||||||
- lazy
|
- lazy
|
||||||
- lists
|
- lists
|
||||||
- maybe
|
- maybe
|
||||||
|
- mmorph
|
||||||
- newtype
|
- newtype
|
||||||
- node-buffer
|
- node-buffer
|
||||||
- node-event-emitter
|
- node-event-emitter
|
||||||
@ -83,6 +93,7 @@ workspace:
|
|||||||
- parallel
|
- parallel
|
||||||
- parsing
|
- parsing
|
||||||
- partial
|
- partial
|
||||||
|
- pipes
|
||||||
- precise-datetime
|
- precise-datetime
|
||||||
- prelude
|
- prelude
|
||||||
- profunctor
|
- profunctor
|
||||||
@ -90,6 +101,7 @@ workspace:
|
|||||||
- record-extra
|
- record-extra
|
||||||
- refs
|
- refs
|
||||||
- safe-coerce
|
- safe-coerce
|
||||||
|
- spec
|
||||||
- st
|
- st
|
||||||
- strings
|
- strings
|
||||||
- tailrec
|
- tailrec
|
||||||
@ -124,6 +136,21 @@ packages:
|
|||||||
- tailrec
|
- tailrec
|
||||||
- transformers
|
- transformers
|
||||||
- unsafe-coerce
|
- unsafe-coerce
|
||||||
|
aff-promise:
|
||||||
|
type: registry
|
||||||
|
version: 4.0.0
|
||||||
|
integrity: sha256-Kq5EupbUpXeUXx4JqGQE7/RTTz/H6idzWhsocwlEFhM=
|
||||||
|
dependencies:
|
||||||
|
- aff
|
||||||
|
- foreign
|
||||||
|
ansi:
|
||||||
|
type: registry
|
||||||
|
version: 7.0.0
|
||||||
|
integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE=
|
||||||
|
dependencies:
|
||||||
|
- foldable-traversable
|
||||||
|
- lists
|
||||||
|
- strings
|
||||||
arraybuffer-types:
|
arraybuffer-types:
|
||||||
type: registry
|
type: registry
|
||||||
version: 3.0.2
|
version: 3.0.2
|
||||||
@ -148,6 +175,17 @@ packages:
|
|||||||
- tuples
|
- tuples
|
||||||
- unfoldable
|
- unfoldable
|
||||||
- unsafe-coerce
|
- unsafe-coerce
|
||||||
|
avar:
|
||||||
|
type: registry
|
||||||
|
version: 5.0.0
|
||||||
|
integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A=
|
||||||
|
dependencies:
|
||||||
|
- aff
|
||||||
|
- effect
|
||||||
|
- either
|
||||||
|
- exceptions
|
||||||
|
- functions
|
||||||
|
- maybe
|
||||||
bifunctors:
|
bifunctors:
|
||||||
type: registry
|
type: registry
|
||||||
version: 6.0.0
|
version: 6.0.0
|
||||||
@ -158,6 +196,18 @@ packages:
|
|||||||
- newtype
|
- newtype
|
||||||
- prelude
|
- prelude
|
||||||
- tuples
|
- tuples
|
||||||
|
catenable-lists:
|
||||||
|
type: registry
|
||||||
|
version: 7.0.0
|
||||||
|
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
|
||||||
|
dependencies:
|
||||||
|
- control
|
||||||
|
- foldable-traversable
|
||||||
|
- lists
|
||||||
|
- maybe
|
||||||
|
- prelude
|
||||||
|
- tuples
|
||||||
|
- unfoldable
|
||||||
console:
|
console:
|
||||||
type: registry
|
type: registry
|
||||||
version: 6.1.0
|
version: 6.1.0
|
||||||
@ -339,6 +389,12 @@ packages:
|
|||||||
- tuples
|
- tuples
|
||||||
- typelevel-prelude
|
- typelevel-prelude
|
||||||
- unfoldable
|
- unfoldable
|
||||||
|
fork:
|
||||||
|
type: registry
|
||||||
|
version: 6.0.0
|
||||||
|
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
|
||||||
|
dependencies:
|
||||||
|
- aff
|
||||||
formatters:
|
formatters:
|
||||||
type: registry
|
type: registry
|
||||||
version: 7.0.0
|
version: 7.0.0
|
||||||
@ -351,6 +407,25 @@ packages:
|
|||||||
- parsing
|
- parsing
|
||||||
- prelude
|
- prelude
|
||||||
- transformers
|
- transformers
|
||||||
|
free:
|
||||||
|
type: registry
|
||||||
|
version: 7.1.0
|
||||||
|
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
|
||||||
|
dependencies:
|
||||||
|
- catenable-lists
|
||||||
|
- control
|
||||||
|
- distributive
|
||||||
|
- either
|
||||||
|
- exists
|
||||||
|
- foldable-traversable
|
||||||
|
- invariant
|
||||||
|
- lazy
|
||||||
|
- maybe
|
||||||
|
- prelude
|
||||||
|
- tailrec
|
||||||
|
- transformers
|
||||||
|
- tuples
|
||||||
|
- unsafe-coerce
|
||||||
functions:
|
functions:
|
||||||
type: registry
|
type: registry
|
||||||
version: 6.0.0
|
version: 6.0.0
|
||||||
@ -460,6 +535,14 @@ packages:
|
|||||||
- invariant
|
- invariant
|
||||||
- newtype
|
- newtype
|
||||||
- prelude
|
- prelude
|
||||||
|
mmorph:
|
||||||
|
type: registry
|
||||||
|
version: 7.0.0
|
||||||
|
integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ=
|
||||||
|
dependencies:
|
||||||
|
- free
|
||||||
|
- functors
|
||||||
|
- transformers
|
||||||
newtype:
|
newtype:
|
||||||
type: registry
|
type: registry
|
||||||
version: 5.0.0
|
version: 5.0.0
|
||||||
@ -610,6 +693,18 @@ packages:
|
|||||||
version: 4.0.0
|
version: 4.0.0
|
||||||
integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4=
|
integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4=
|
||||||
dependencies: []
|
dependencies: []
|
||||||
|
pipes:
|
||||||
|
type: registry
|
||||||
|
version: 8.0.0
|
||||||
|
integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY=
|
||||||
|
dependencies:
|
||||||
|
- aff
|
||||||
|
- lists
|
||||||
|
- mmorph
|
||||||
|
- prelude
|
||||||
|
- tailrec
|
||||||
|
- transformers
|
||||||
|
- tuples
|
||||||
precise-datetime:
|
precise-datetime:
|
||||||
type: registry
|
type: registry
|
||||||
version: 7.0.0
|
version: 7.0.0
|
||||||
@ -683,6 +778,38 @@ packages:
|
|||||||
integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU=
|
integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU=
|
||||||
dependencies:
|
dependencies:
|
||||||
- unsafe-coerce
|
- unsafe-coerce
|
||||||
|
spec:
|
||||||
|
type: registry
|
||||||
|
version: 7.6.0
|
||||||
|
integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I=
|
||||||
|
dependencies:
|
||||||
|
- aff
|
||||||
|
- ansi
|
||||||
|
- arrays
|
||||||
|
- avar
|
||||||
|
- bifunctors
|
||||||
|
- control
|
||||||
|
- datetime
|
||||||
|
- effect
|
||||||
|
- either
|
||||||
|
- exceptions
|
||||||
|
- foldable-traversable
|
||||||
|
- fork
|
||||||
|
- identity
|
||||||
|
- integers
|
||||||
|
- lists
|
||||||
|
- maybe
|
||||||
|
- newtype
|
||||||
|
- now
|
||||||
|
- ordered-collections
|
||||||
|
- parallel
|
||||||
|
- pipes
|
||||||
|
- prelude
|
||||||
|
- refs
|
||||||
|
- strings
|
||||||
|
- tailrec
|
||||||
|
- transformers
|
||||||
|
- tuples
|
||||||
st:
|
st:
|
||||||
type: registry
|
type: registry
|
||||||
version: 6.2.0
|
version: 6.2.0
|
||||||
|
@ -10,6 +10,8 @@ package:
|
|||||||
strict: true
|
strict: true
|
||||||
pedanticPackages: true
|
pedanticPackages: true
|
||||||
dependencies:
|
dependencies:
|
||||||
|
- aff-promise
|
||||||
|
- profunctor
|
||||||
- aff: ">=7.1.0 <8.0.0"
|
- aff: ">=7.1.0 <8.0.0"
|
||||||
- arrays: ">=7.3.0 <8.0.0"
|
- arrays: ">=7.3.0 <8.0.0"
|
||||||
- bifunctors: ">=6.0.0 <7.0.0"
|
- bifunctors: ">=6.0.0 <7.0.0"
|
||||||
@ -46,5 +48,6 @@ package:
|
|||||||
main: Test.Main
|
main: Test.Main
|
||||||
dependencies:
|
dependencies:
|
||||||
- console
|
- console
|
||||||
|
- spec
|
||||||
workspace:
|
workspace:
|
||||||
extraPackages: {}
|
extraPackages: {}
|
||||||
|
535
src/Node.Stream.Object.js
Normal file
535
src/Node.Stream.Object.js
Normal file
@ -0,0 +1,535 @@
|
|||||||
|
import Stream from "stream";
|
||||||
|
|
||||||
|
const DEBUG = process.env['NODEJS_OBJECT_STREAM_TRACE'] !== ''
|
||||||
|
|
||||||
|
/** @type {(s: string) => void} */
|
||||||
|
const log = m => DEBUG ? console.log(m) : undefined;
|
||||||
|
|
||||||
|
let chainCount = 0
|
||||||
|
let composeCount = 0
|
||||||
|
let neverCount = 0
|
||||||
|
let onceCount = 0
|
||||||
|
let bindCount = 0
|
||||||
|
let zipCount = 0
|
||||||
|
let mapCount = 0
|
||||||
|
let constCount = 0
|
||||||
|
let fromPromiseCount = 0
|
||||||
|
|
||||||
|
export class Never extends Stream.Readable {
|
||||||
|
constructor() {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.id = neverCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
log(`Never {id: ${this.id}}#_read()`)
|
||||||
|
log(` this.push(null)`)
|
||||||
|
this.push(null)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @template T */
|
||||||
|
export class Once extends Stream.Duplex {
|
||||||
|
/** @param {T} a */
|
||||||
|
constructor(a) {
|
||||||
|
super({objectMode: true, allowHalfOpen: false})
|
||||||
|
this.a = a
|
||||||
|
this.id = onceCount++
|
||||||
|
this.push(this.a)
|
||||||
|
this.push(null)
|
||||||
|
log(`Once {id: ${this.id}}#new()`)
|
||||||
|
log(` this.push(${a})`)
|
||||||
|
log(` this.push(null)`)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {Stream.Duplex['_write']} */
|
||||||
|
_write(_ck, _enc, cb) {
|
||||||
|
cb()
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
log(`Once {id: ${this.id}}#_read()`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @template T */
|
||||||
|
export class Const extends Stream.Transform {
|
||||||
|
/** @param {T} a */
|
||||||
|
constructor(a) {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.a = a
|
||||||
|
this.id = constCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {Stream.Transform['_transform']} */
|
||||||
|
_transform(_c, _enc, cb) {
|
||||||
|
log(`Const {id: ${this.id}}#_transform(${_c}, _, _)`)
|
||||||
|
log(` cb(${this.a})`)
|
||||||
|
this.push(this.a)
|
||||||
|
cb()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @template T */
|
||||||
|
export class FromPromise extends Stream.Readable {
|
||||||
|
/** @param {Promise<T>} p */
|
||||||
|
constructor(p) {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.id = fromPromiseCount++
|
||||||
|
p
|
||||||
|
.then(a => {
|
||||||
|
log(`FromPromise {id: ${this.id}}#new()`)
|
||||||
|
log(` ...p.then(...)`)
|
||||||
|
log(` this.push(${a})`)
|
||||||
|
log(` this.push(null)`)
|
||||||
|
this.push(a)
|
||||||
|
this.push(null)
|
||||||
|
})
|
||||||
|
.catch(e => {
|
||||||
|
log(`FromPromise {id: ${this.id}}#new()`)
|
||||||
|
log(` ...p.catch(...)`)
|
||||||
|
log(` this.destroy(${e})`)
|
||||||
|
this.destroy(e)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
log(`FromPromise {id: ${this.id}}#_read()`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @template T */
|
||||||
|
export class Chain extends Stream.Readable {
|
||||||
|
/** @param {...Stream.Readable} streams */
|
||||||
|
constructor(...streams) {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.id = chainCount++
|
||||||
|
this.ix = -1
|
||||||
|
this.streams = streams
|
||||||
|
this.next()
|
||||||
|
}
|
||||||
|
|
||||||
|
next() {
|
||||||
|
log(`Chain {id: ${this.id}}#next()`)
|
||||||
|
this.ix++
|
||||||
|
if (this.ix === this.streams.length) {
|
||||||
|
log(` this.push(null)`)
|
||||||
|
this.push(null)
|
||||||
|
} else {
|
||||||
|
const cur = this.streams[this.ix]
|
||||||
|
|
||||||
|
cur.once('error', e => {
|
||||||
|
log(`Chain {id: ${this.id}}#next()`)
|
||||||
|
log(` cur.once('error', ...)`)
|
||||||
|
log(` this.destroy(${e})`)
|
||||||
|
this.destroy(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
cur.once('end', () => {
|
||||||
|
log(`Chain {id: ${this.id}}#next()`)
|
||||||
|
log(` cur.once('end', ...)`)
|
||||||
|
log(` this.next()`)
|
||||||
|
this.next()
|
||||||
|
})
|
||||||
|
|
||||||
|
cur.on('data', ck => {
|
||||||
|
log(`Chain {id: ${this.id}}#next()`)
|
||||||
|
log(` cur.on('data', ...)`)
|
||||||
|
log(` this.push(${ck})`)
|
||||||
|
const canPush = this.push(ck)
|
||||||
|
if (cur && !canPush) {
|
||||||
|
log(` cur.pause()`)
|
||||||
|
cur.pause()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
log(`Chain {id: ${this.id}}#_read()`)
|
||||||
|
this.streams.forEach(s => {
|
||||||
|
if (s.isPaused()) {
|
||||||
|
log(` s.resume()`)
|
||||||
|
s.resume()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @template T
|
||||||
|
* @template R
|
||||||
|
*/
|
||||||
|
export class Map extends Stream.Transform {
|
||||||
|
/** @param {(t: T) => R} f */
|
||||||
|
constructor(f) {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.f = f
|
||||||
|
this.id = mapCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {Stream.Transform['_transform']} */
|
||||||
|
_transform(ck, _, cb) {
|
||||||
|
log(`Map {id: ${this.id}}#_transform(${ck}, _, _)`)
|
||||||
|
const r = this.f(ck)
|
||||||
|
log(` const r = (${r})`)
|
||||||
|
log(` cb(null, ${r})`)
|
||||||
|
cb(null, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Zip extends Stream.Readable {
|
||||||
|
/** @type {Array<Stream.Readable>} */
|
||||||
|
streams = []
|
||||||
|
|
||||||
|
/** @type {Array<Array<unknown | null>>} */
|
||||||
|
buf = []
|
||||||
|
|
||||||
|
/** @type {Timer | null} */
|
||||||
|
readingTimer = null
|
||||||
|
/** @type {Timer | null} */
|
||||||
|
pushingTimer = null
|
||||||
|
|
||||||
|
/** @param {...Stream.Readable} streams */
|
||||||
|
constructor(...streams) {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.id = zipCount++
|
||||||
|
this.streams = streams
|
||||||
|
this.streams.forEach((s, ix) => {
|
||||||
|
s.once('error', e => this.destroy(e))
|
||||||
|
s.once('end', () => this.push(null))
|
||||||
|
s.on('data', ck => {
|
||||||
|
this.bufput(ix, ck)
|
||||||
|
s.pause()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {(ix: number, val: unknown) => void} */
|
||||||
|
bufput(ix, val) {
|
||||||
|
if (this.buflastdone()) {
|
||||||
|
this.bufnew()
|
||||||
|
}
|
||||||
|
const zipped = this.buflast()
|
||||||
|
zipped[ix] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
bufnew() {
|
||||||
|
this.buf.push(this.streams.map(_ => null))
|
||||||
|
}
|
||||||
|
|
||||||
|
buflastdone() {
|
||||||
|
const l = this.buflast()
|
||||||
|
return l.every(a => a !== null)
|
||||||
|
}
|
||||||
|
|
||||||
|
bufheaddone() {
|
||||||
|
const l = this.bufhead()
|
||||||
|
return l.every(a => a !== null)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @returns {unknown[]} */
|
||||||
|
buflast() {
|
||||||
|
if (this.buf.length === 0) {
|
||||||
|
this.bufnew()
|
||||||
|
}
|
||||||
|
return this.buf[this.buf.length - 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @returns {unknown[]} */
|
||||||
|
bufhead() {
|
||||||
|
if (this.buf.length === 0) {
|
||||||
|
this.bufnew()
|
||||||
|
}
|
||||||
|
return this.buf[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
anyended() {
|
||||||
|
return this.streams.some(s => s.readableEnded)
|
||||||
|
}
|
||||||
|
|
||||||
|
maybeend() {
|
||||||
|
if (!this.bufheaddone() && this.anyended()) {
|
||||||
|
this.push(null)
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
readzipped() {
|
||||||
|
if (this.anyended()) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.buflastdone()) {
|
||||||
|
this.bufnew()
|
||||||
|
this.streams.forEach((s, ix) => {
|
||||||
|
this.buflast()[ix] = s.read()
|
||||||
|
})
|
||||||
|
return this.buflastdone()
|
||||||
|
} else {
|
||||||
|
const missingIxs = this.buflast().flatMap((a, ix) => a === null ? [ix] : [])
|
||||||
|
missingIxs.forEach(ix => {
|
||||||
|
this.buflast()[ix] = this.streams[ix].read()
|
||||||
|
})
|
||||||
|
return this.buflastdone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
if (this.readingTimer) clearTimeout(this.readingTimer);
|
||||||
|
if (this.pushingTimer) clearTimeout(this.pushingTimer);
|
||||||
|
|
||||||
|
this.readingTimer = setTimeout(() => {
|
||||||
|
while (this.readzipped()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}, 0)
|
||||||
|
|
||||||
|
this.pushingTimer = setTimeout(() => {
|
||||||
|
let canWrite = true
|
||||||
|
|
||||||
|
if (this.maybeend()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
while (canWrite && this.bufheaddone()) {
|
||||||
|
canWrite = this.push(this.bufhead())
|
||||||
|
this.buf.shift()
|
||||||
|
}
|
||||||
|
|
||||||
|
this.maybeend()
|
||||||
|
}, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Compose extends Stream.Duplex {
|
||||||
|
/**
|
||||||
|
* @param {Stream.Readable | Stream.Transform} a
|
||||||
|
* @param {Stream.Transform} b
|
||||||
|
*/
|
||||||
|
constructor(a, b) {
|
||||||
|
super({objectMode: true})
|
||||||
|
this.id = composeCount++
|
||||||
|
|
||||||
|
this.a = a
|
||||||
|
this.b = b
|
||||||
|
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` a.on('data', ...)`)
|
||||||
|
log(` a.once('end', ...)`)
|
||||||
|
log(` a.once('error', ...)`)
|
||||||
|
log(` a.pause()`)
|
||||||
|
log(` b.on('drain', ...)`)
|
||||||
|
log(` b.on('data', ...)`)
|
||||||
|
log(` b.on('error', ...)`)
|
||||||
|
log(` b.on('finish', ...)`)
|
||||||
|
|
||||||
|
this.a.once('end', () => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` a.on('end', ...)`)
|
||||||
|
log(` b.end()`)
|
||||||
|
this.b.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
this.a.on('data', ck => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` a.on('data', ...)`)
|
||||||
|
log(` b.write(${ck})`)
|
||||||
|
const canWrite = this.b.write(ck)
|
||||||
|
|
||||||
|
if (!canWrite) {
|
||||||
|
log(` a.pause()`)
|
||||||
|
this.a.pause()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
this.a.once('error', e => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` a.once('error', ...)`)
|
||||||
|
log(` this.destroy(${e})`)
|
||||||
|
this.destroy(e)
|
||||||
|
this.b.destroy(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.b.on('drain', () => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` b.on('drain', ...)`)
|
||||||
|
log(` this.a.resume()`)
|
||||||
|
this.a.resume()
|
||||||
|
})
|
||||||
|
|
||||||
|
this.b.on('data', ck => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` b.on('data', ...)`)
|
||||||
|
log(` this.push(${ck})`)
|
||||||
|
const canPush = this.push(ck)
|
||||||
|
if (!canPush) {
|
||||||
|
log(` b.pause()`)
|
||||||
|
this.b.pause()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
this.b.once('error', e => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` b.once('error', ...)`)
|
||||||
|
log(` this.destroy(${e})`)
|
||||||
|
this.destroy(e)
|
||||||
|
this.a.destroy(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.b.once('finish', () => {
|
||||||
|
log(`Compose {id: ${this.id}}#new()`)
|
||||||
|
log(` b.once('finish', ...)`)
|
||||||
|
log(` this.emit('finish')`)
|
||||||
|
this.push(null)
|
||||||
|
this.end()
|
||||||
|
this.emit('finish')
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
_read() {
|
||||||
|
log(`Compose {id: ${this.id}}#_read()`)
|
||||||
|
if (this.b.isPaused()) {
|
||||||
|
log(` b.resume()`)
|
||||||
|
this.b.resume()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {Stream.Duplex['_write']} */
|
||||||
|
_write(ck, _enc, cb) {
|
||||||
|
log(`Compose {id: ${this.id}}#_write(${ck}, _, _)`)
|
||||||
|
if (this.a instanceof Stream.Readable) {
|
||||||
|
throw new Error('Cannot `write` to a Readable stream')
|
||||||
|
}
|
||||||
|
|
||||||
|
log(` this.a.write(${ck}, _, _)`)
|
||||||
|
this.a.write(ck, _enc, cb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @template T
|
||||||
|
*/
|
||||||
|
export class Bind extends Stream.Duplex {
|
||||||
|
/**
|
||||||
|
* @param {(t: T) => () => Stream.Readable} f
|
||||||
|
*/
|
||||||
|
constructor(f) {
|
||||||
|
super({objectMode: true, allowHalfOpen: true})
|
||||||
|
this.f = f
|
||||||
|
this.id = bindCount++
|
||||||
|
|
||||||
|
this.ix = 0
|
||||||
|
|
||||||
|
/** @type {Array<Stream.Readable>} */
|
||||||
|
this.streams = []
|
||||||
|
|
||||||
|
/** @type {(() => void) | undefined} */
|
||||||
|
this.doneCb = undefined
|
||||||
|
|
||||||
|
this.paused = true
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {NonNullable<Stream.Duplex['_final']>} */
|
||||||
|
_final(cb) {
|
||||||
|
log(`Bind {id: ${this.id}}#_final(_)`)
|
||||||
|
this.doneCb = cb
|
||||||
|
}
|
||||||
|
|
||||||
|
init() {
|
||||||
|
log(`Bind {id: ${this.id}}#init()`)
|
||||||
|
|
||||||
|
const s = this.streams[this.ix]
|
||||||
|
if (this.paused || (!s && !this.doneCb)) {
|
||||||
|
this.paused = true
|
||||||
|
return
|
||||||
|
} else if (this.doneCb) {
|
||||||
|
log(` this.doneCb()`)
|
||||||
|
this.doneCb()
|
||||||
|
this.doneCb = undefined
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log(` s.on('data', ...)`)
|
||||||
|
s.on('data', ck => {
|
||||||
|
log(`Bind {id: ${this.id}}#initcur()`)
|
||||||
|
log(` s.on('data', ...)`)
|
||||||
|
log(` this.push(${ck})`)
|
||||||
|
const canPush = this.push(ck)
|
||||||
|
if (!canPush) {
|
||||||
|
s.pause()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
log(` s.once('end', ...)`)
|
||||||
|
s.once('end', () => {
|
||||||
|
log(`Bind {id: ${this.id}}#initcur()`)
|
||||||
|
log(` s.once('end', ...)`)
|
||||||
|
log(` this.ix++`)
|
||||||
|
log(` this.init()`)
|
||||||
|
this.ix++
|
||||||
|
this.init()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {Stream.Duplex['_write']} */
|
||||||
|
_write(ck, _, cb) {
|
||||||
|
log(`Bind {id: ${this.id}}#_write(${ck}, _, _)`)
|
||||||
|
try {
|
||||||
|
log(` this.streams = ${JSON.stringify(this.streams.map(_ => 'Readable'))}`)
|
||||||
|
this.streams.push(this.f(ck)())
|
||||||
|
if (this.paused) {
|
||||||
|
log(` this.init()`)
|
||||||
|
this.paused = false
|
||||||
|
this.init()
|
||||||
|
}
|
||||||
|
log(` cb()`)
|
||||||
|
cb()
|
||||||
|
} catch(e) {
|
||||||
|
log(` cb(${e})`)
|
||||||
|
// @ts-ignore
|
||||||
|
cb(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {Stream.Duplex['_read']} */
|
||||||
|
_read() {
|
||||||
|
log(`Bind {id: ${this.id}}#_read()`)
|
||||||
|
this.streams.forEach(s =>
|
||||||
|
s.isPaused() ? s.resume() : undefined)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @type {() => Stream.Readable} */
|
||||||
|
export const neverImpl = () => new Never();
|
||||||
|
|
||||||
|
/** @type {<T>(a: T) => () => Stream.Transform} */
|
||||||
|
export const constImpl = (a) => () => new Const(a);
|
||||||
|
|
||||||
|
/** @type {<T>(a: T) => () => Stream.Readable} */
|
||||||
|
export const onceImpl = (a) => () => new Once(a);
|
||||||
|
|
||||||
|
/** @type {<T>(a: () => Promise<T>) => () => Stream.Readable} */
|
||||||
|
export const fromPromiseImpl = (a) => () => new FromPromise(a())
|
||||||
|
|
||||||
|
/** @type {(ss: Array<Stream.Readable>) => () => Stream.Readable} */
|
||||||
|
export const chainImpl = (ss) => () => new Chain(...ss);
|
||||||
|
|
||||||
|
/** @type {<T, R>(f: (t: T) => R) => () => Stream.Transform} */
|
||||||
|
export const mapImpl = (f) => () => new Map(f);
|
||||||
|
|
||||||
|
/** @type {(iab: Stream.Readable) => (ia: Stream.Readable) => () => Stream.Readable} */
|
||||||
|
export const applyImpl = (iab) => (ia) => () =>
|
||||||
|
new Compose(new Zip(iab, ia), new Map(([ab, a]) => ab(a)))
|
||||||
|
|
||||||
|
/** @type {<T>(f: (t: T) => () => Stream.Readable) => () => Stream.Duplex} */
|
||||||
|
export const bindImpl = (f) => () => new Bind(f)
|
||||||
|
|
||||||
|
/** @type {(a: Stream.Transform | Stream.Readable) => (b: Stream.Transform) => () => Stream.Duplex} */
|
||||||
|
export const pipeImpl = (a) => (b) => () => new Compose(a, b)
|
||||||
|
|
||||||
|
process.on('beforeExit', () => {
|
||||||
|
debugger;
|
||||||
|
})
|
189
src/Node.Stream.Object.purs
Normal file
189
src/Node.Stream.Object.purs
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
module Node.Stream.Object where
|
||||||
|
|
||||||
|
import Prelude
|
||||||
|
|
||||||
|
import Control.Monad.Rec.Class (untilJust)
|
||||||
|
import Control.Monad.ST.Global as ST
|
||||||
|
import Control.Monad.ST.Ref as STRef
|
||||||
|
import Control.Promise (Promise)
|
||||||
|
import Control.Promise as Promise
|
||||||
|
import Data.Array as Array
|
||||||
|
import Data.Array.ST as Array.ST
|
||||||
|
import Data.Either (Either(..))
|
||||||
|
import Data.Maybe (Maybe(..))
|
||||||
|
import Data.Newtype (class Newtype, unwrap, wrap)
|
||||||
|
import Data.Profunctor (class Profunctor)
|
||||||
|
import Data.Traversable (class Traversable, traverse)
|
||||||
|
import Effect (Effect)
|
||||||
|
import Effect.Aff (Aff, delay, effectCanceler, launchAff_, makeAff)
|
||||||
|
import Effect.Aff.Class (class MonadAff)
|
||||||
|
import Effect.Class (class MonadEffect, liftEffect)
|
||||||
|
import Effect.Exception (Error)
|
||||||
|
import Effect.Uncurried (mkEffectFn1)
|
||||||
|
import Node.Buffer (Buffer)
|
||||||
|
import Node.EventEmitter (EventHandle(..))
|
||||||
|
import Node.EventEmitter as Event
|
||||||
|
import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0)
|
||||||
|
import Node.Stream (Readable, Writable)
|
||||||
|
import Unsafe.Coerce (unsafeCoerce)
|
||||||
|
|
||||||
|
foreign import data Stream :: Type -> Type -> Type
|
||||||
|
|
||||||
|
newtype ObjectStream :: Type -> Type -> Type
|
||||||
|
newtype ObjectStream i o = ObjectStream (Effect (Stream i o))
|
||||||
|
|
||||||
|
derive instance Newtype (ObjectStream i o) _
|
||||||
|
|
||||||
|
instance Functor (ObjectStream i) where
|
||||||
|
map :: forall a b. (a -> b) -> ObjectStream i a -> ObjectStream i b
|
||||||
|
map ab (ObjectStream ia) = wrap $ join $ pure pipeImpl <*> ia <*> mapImpl ab
|
||||||
|
|
||||||
|
instance Apply (ObjectStream i) where
|
||||||
|
apply :: forall a b. ObjectStream i (a -> b) -> ObjectStream i a -> ObjectStream i b
|
||||||
|
apply (ObjectStream iab) (ObjectStream ia) = wrap $ join $ pure applyImpl <*> iab <*> ia
|
||||||
|
|
||||||
|
instance Applicative (ObjectStream i) where
|
||||||
|
pure = once
|
||||||
|
|
||||||
|
instance Monad (ObjectStream i)
|
||||||
|
|
||||||
|
instance MonadEffect (ObjectStream Unit) where
|
||||||
|
liftEffect = wrap <<< flip bind onceImpl
|
||||||
|
|
||||||
|
instance MonadAff (ObjectStream Unit) where
|
||||||
|
liftAff = wrap <<< fromPromiseImpl <<< Promise.fromAff
|
||||||
|
|
||||||
|
instance Bind (ObjectStream i) where
|
||||||
|
bind (ObjectStream sia') asb = wrap do
|
||||||
|
sia <- sia'
|
||||||
|
sab <- bindImpl (unwrap <<< asb)
|
||||||
|
pipeImpl sia sab
|
||||||
|
|
||||||
|
instance Profunctor ObjectStream where
|
||||||
|
dimap ab cd (ObjectStream sbc') = wrap do
|
||||||
|
sbc <- sbc'
|
||||||
|
sab <- mapImpl ab
|
||||||
|
scd <- mapImpl cd
|
||||||
|
sac <- pipeImpl sab sbc
|
||||||
|
pipeImpl sac scd
|
||||||
|
|
||||||
|
-- | A stream that will emit the value `a` exactly once, and ignores any written chunks.
|
||||||
|
once :: forall i a. a -> ObjectStream i a
|
||||||
|
once = wrap <<< onceImpl
|
||||||
|
|
||||||
|
-- | A stream that will immediately emit `close` and `end` events.
|
||||||
|
never :: forall a. ObjectStream Unit a
|
||||||
|
never = wrap neverImpl
|
||||||
|
|
||||||
|
-- | A stream that for all input chunks, emits `unit`.
|
||||||
|
sink :: forall a. ObjectStream a Unit
|
||||||
|
sink = pure unit
|
||||||
|
|
||||||
|
-- | Create a stream from a `Foldable` of `a`s
|
||||||
|
fromFoldable :: forall f a. Traversable f => f a -> ObjectStream Unit a
|
||||||
|
fromFoldable = chainMany <<< map once
|
||||||
|
|
||||||
|
-- | Convert a `Readable` stream emitting `Buffer` chunks to an `ObjectStream`
|
||||||
|
fromBufferReadable :: forall r. Readable r -> ObjectStream Unit Buffer
|
||||||
|
fromBufferReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit Buffer) r
|
||||||
|
|
||||||
|
-- | Convert a `Readable` stream emitting `String` chunks to an `ObjectStream`
|
||||||
|
fromStringReadable :: forall r. Readable r -> ObjectStream Unit String
|
||||||
|
fromStringReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit String) r
|
||||||
|
|
||||||
|
-- | Convert a `Writable` stream accepting `Buffer` chunks to an `ObjectStream`
|
||||||
|
fromBufferWritable :: forall r. Writable r -> ObjectStream Buffer Unit
|
||||||
|
fromBufferWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream Buffer Unit) r
|
||||||
|
|
||||||
|
-- | Convert a `Writable` stream accepting `String` chunks to an `ObjectStream`
|
||||||
|
fromStringWritable :: forall r. Writable r -> ObjectStream String Unit
|
||||||
|
fromStringWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream String Unit) r
|
||||||
|
|
||||||
|
-- | Emit chunks from the first stream, then when exhausted emit chunks from the second.
|
||||||
|
chain :: forall a. ObjectStream Unit a -> ObjectStream Unit a -> ObjectStream Unit a
|
||||||
|
chain a b = chainMany [ a, b ]
|
||||||
|
|
||||||
|
-- | `chain` for an arbitrary number of streams.
|
||||||
|
chainMany :: forall f a. Traversable f => f (ObjectStream Unit a) -> ObjectStream Unit a
|
||||||
|
chainMany as' = wrap do
|
||||||
|
as <- Array.fromFoldable <$> traverse unwrap as'
|
||||||
|
chainImpl as
|
||||||
|
|
||||||
|
run_ :: forall a. ObjectStream Unit a -> Aff Unit
|
||||||
|
run_ = void <<< run
|
||||||
|
|
||||||
|
run :: forall a. ObjectStream Unit a -> Aff (Array a)
|
||||||
|
run (ObjectStream s') = do
|
||||||
|
runningCount <- liftEffect $ ST.toEffect $ STRef.new 0
|
||||||
|
values <- liftEffect $ ST.toEffect $ Array.ST.new
|
||||||
|
s <- liftEffect s'
|
||||||
|
makeAff \res ->
|
||||||
|
let
|
||||||
|
onData a = ST.toEffect do
|
||||||
|
void $ STRef.modify (_ + 1) runningCount
|
||||||
|
void $ Array.ST.push a values
|
||||||
|
void $ STRef.modify (_ - 1) runningCount
|
||||||
|
onError e = res $ Left e
|
||||||
|
onEnd = launchAff_ do
|
||||||
|
untilJust do
|
||||||
|
delay $ wrap $ 1.0
|
||||||
|
running <- liftEffect $ ST.toEffect $ STRef.read runningCount
|
||||||
|
pure $ if running == 0 then Just unit else Nothing
|
||||||
|
values' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze values
|
||||||
|
liftEffect $ res $ Right values'
|
||||||
|
in
|
||||||
|
do
|
||||||
|
cancelData <- Event.on dataH onData s
|
||||||
|
cancelError <- Event.on errorH onError s
|
||||||
|
cancelEnd <- Event.on endH onEnd s
|
||||||
|
pure $ effectCanceler do
|
||||||
|
cancelData
|
||||||
|
cancelError
|
||||||
|
cancelEnd
|
||||||
|
|
||||||
|
-- | Constructs a Stream that re-emits the outputs from each stream, in order.
|
||||||
|
foreign import chainImpl :: forall a. Array (Stream Unit a) -> Effect (Stream Unit a)
|
||||||
|
|
||||||
|
-- | Pipes a stream's output into another's input, returning the new composite stream.
|
||||||
|
-- |
|
||||||
|
-- | Note that this differs from `Readable#pipe`, which returns the destination stream
|
||||||
|
-- | verbatim to allow chained piping of only _outputs_.
|
||||||
|
foreign import pipeImpl :: forall a b c. Stream a b -> Stream b c -> Effect (Stream a c)
|
||||||
|
|
||||||
|
-- | A readable stream that immediately closes without emitting any chunks.
|
||||||
|
foreign import neverImpl :: forall a. Effect (Stream Unit a)
|
||||||
|
|
||||||
|
-- | Constructs a readable stream from an asynchronous value.
|
||||||
|
foreign import fromPromiseImpl :: forall a. Effect (Promise a) -> Effect (Stream Unit a)
|
||||||
|
|
||||||
|
-- | Constructs a readable stream that emits a single value then closes.
|
||||||
|
foreign import onceImpl :: forall i a. a -> Effect (Stream i a)
|
||||||
|
|
||||||
|
-- | Constructs a `Transform` applying the given function to chunks.
|
||||||
|
foreign import mapImpl :: forall a b. (a -> b) -> Effect (Stream a b)
|
||||||
|
|
||||||
|
-- | Constructs a `Transform` zipping functions from the left stream with values from the right stream.
|
||||||
|
-- |
|
||||||
|
-- | Closes when either stream closes.
|
||||||
|
foreign import applyImpl :: forall i a b. Stream i (a -> b) -> Stream i a -> Effect (Stream i b)
|
||||||
|
|
||||||
|
-- | Constructs a `Transform` which applies a function to each written chunk.
|
||||||
|
-- |
|
||||||
|
-- | The values emitted by the stream returned by this function are then emitted
|
||||||
|
-- | until the temporary stream closes.
|
||||||
|
foreign import bindImpl :: forall a _x b. (a -> Effect (Stream _x b)) -> Effect (Stream a b)
|
||||||
|
|
||||||
|
dataH :: forall i o. EventHandle1 (Stream i o) o
|
||||||
|
dataH = EventHandle "data" mkEffectFn1
|
||||||
|
|
||||||
|
readableH :: forall i o. EventHandle0 (Stream i o)
|
||||||
|
readableH = EventHandle "readable" identity
|
||||||
|
|
||||||
|
closeH :: forall i o. EventHandle0 (Stream i o)
|
||||||
|
closeH = EventHandle "close" identity
|
||||||
|
|
||||||
|
endH :: forall i o. EventHandle0 (Stream i o)
|
||||||
|
endH = EventHandle "end" identity
|
||||||
|
|
||||||
|
errorH :: forall i o. EventHandle1 (Stream i o) Error
|
||||||
|
errorH = EventHandle "error" mkEffectFn1
|
@ -2,11 +2,13 @@ module Test.Main where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
|
import Data.Maybe (Maybe(..))
|
||||||
import Effect (Effect)
|
import Effect (Effect)
|
||||||
import Effect.Class.Console (log)
|
import Effect.Aff (launchAff_)
|
||||||
|
import Test.Node.Stream.Object as Test.Node.Stream.Object
|
||||||
|
import Test.Spec.Reporter (consoleReporter, specReporter)
|
||||||
|
import Test.Spec.Runner (defaultConfig, runSpec')
|
||||||
|
|
||||||
main :: Effect Unit
|
main :: Effect Unit
|
||||||
main = do
|
main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ consoleReporter ] do
|
||||||
log "🍕"
|
Test.Node.Stream.Object.spec
|
||||||
log "You should add some tests."
|
|
||||||
|
|
||||||
|
57
test/Test/Node.Stream.Object.purs
Normal file
57
test/Test/Node.Stream.Object.purs
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
module Test.Node.Stream.Object where
|
||||||
|
|
||||||
|
import Prelude
|
||||||
|
|
||||||
|
import Data.Newtype (wrap)
|
||||||
|
import Effect.Aff (delay)
|
||||||
|
import Effect.Aff.Class (liftAff)
|
||||||
|
import Node.Stream.Object as Stream
|
||||||
|
import Test.Spec (Spec, describe, it)
|
||||||
|
import Test.Spec.Assertions (shouldEqual)
|
||||||
|
|
||||||
|
spec :: Spec Unit
|
||||||
|
spec =
|
||||||
|
describe "Node.Stream.Object" do
|
||||||
|
describe "ObjectStream" do
|
||||||
|
describe "once" do
|
||||||
|
it "emits once" do
|
||||||
|
out <- Stream.run (Stream.once 1)
|
||||||
|
out `shouldEqual` [ 1 ]
|
||||||
|
describe "never" do
|
||||||
|
it "immediately closes" do
|
||||||
|
out <- Stream.run Stream.never
|
||||||
|
out `shouldEqual` ([] :: Array Int)
|
||||||
|
describe "chain" do
|
||||||
|
it "noops" do
|
||||||
|
out <- Stream.run $ Stream.chainMany []
|
||||||
|
out `shouldEqual` ([] :: Array Int)
|
||||||
|
it "works with 1 stream" do
|
||||||
|
out <- Stream.run $ Stream.chainMany [Stream.once 1]
|
||||||
|
out `shouldEqual` [1]
|
||||||
|
it "works with 2 streams" do
|
||||||
|
out <- Stream.run $ Stream.chainMany [Stream.once 1, Stream.once 2]
|
||||||
|
out `shouldEqual` [1, 2]
|
||||||
|
it "does not emit end until last child stream ends" do
|
||||||
|
let
|
||||||
|
delayed n a = liftAff do
|
||||||
|
delay $ wrap n
|
||||||
|
pure a
|
||||||
|
out <- Stream.run $ Stream.chainMany [delayed 10.0 1, delayed 20.0 2]
|
||||||
|
out `shouldEqual` [1, 2]
|
||||||
|
describe "fromFoldable" do
|
||||||
|
it "creates an empty readable" do
|
||||||
|
out <- Stream.run (Stream.fromFoldable [] :: Stream.ObjectStream Unit (Array Int))
|
||||||
|
out `shouldEqual` []
|
||||||
|
it "creates a readable that emits each element" do
|
||||||
|
out <- Stream.run (Stream.fromFoldable [ 1, 2, 3 ])
|
||||||
|
out `shouldEqual` [ 1, 2, 3 ]
|
||||||
|
it "bind maps each number" do
|
||||||
|
out <- Stream.run do
|
||||||
|
a <- Stream.fromFoldable [ 1, 2, 3 ]
|
||||||
|
pure $ a + 1
|
||||||
|
out `shouldEqual` [ 2, 3, 4 ]
|
||||||
|
it "bind fans out" do
|
||||||
|
out <- Stream.run do
|
||||||
|
a <- Stream.fromFoldable [ 1, 2, 3 ]
|
||||||
|
Stream.fromFoldable [a * 10, a * 20]
|
||||||
|
out `shouldEqual` [ 10, 20, 20, 40, 30, 60 ]
|
Loading…
Reference in New Issue
Block a user