Compare commits

...

3 Commits

Author SHA1 Message Date
bbbd84c640
wip 2024-05-09 11:01:09 -05:00
ad04aab031
fix: closer 2024-05-07 11:00:31 -05:00
f3e0a6095e
wip: object streasm 2024-05-06 18:41:11 -05:00
9 changed files with 8867 additions and 5 deletions

7941
index.js Executable file

File diff suppressed because it is too large Load Diff

7
index.js.map Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,6 +1,7 @@
{ {
"name": "purescript-csv-stream", "name": "purescript-csv-stream",
"version": "v1.2.17", "version": "v1.2.17",
"type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6" "csv-stringify": "^6.4.6"

View File

@ -4,6 +4,7 @@ workspace:
path: ./ path: ./
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- aff-promise
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0" - control: ">=6.0.0 <7.0.0"
@ -27,6 +28,7 @@ workspace:
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0" - record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
@ -37,11 +39,16 @@ workspace:
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies: test_dependencies:
- console - console
- spec
build_plan: build_plan:
- aff - aff
- aff-promise
- ansi
- arraybuffer-types - arraybuffer-types
- arrays - arrays
- avar
- bifunctors - bifunctors
- catenable-lists
- console - console
- const - const
- contravariant - contravariant
@ -59,7 +66,9 @@ workspace:
- foldable-traversable - foldable-traversable
- foreign - foreign
- foreign-object - foreign-object
- fork
- formatters - formatters
- free
- functions - functions
- functors - functors
- gen - gen
@ -70,6 +79,7 @@ workspace:
- lazy - lazy
- lists - lists
- maybe - maybe
- mmorph
- newtype - newtype
- node-buffer - node-buffer
- node-event-emitter - node-event-emitter
@ -83,6 +93,7 @@ workspace:
- parallel - parallel
- parsing - parsing
- partial - partial
- pipes
- precise-datetime - precise-datetime
- prelude - prelude
- profunctor - profunctor
@ -90,6 +101,7 @@ workspace:
- record-extra - record-extra
- refs - refs
- safe-coerce - safe-coerce
- spec
- st - st
- strings - strings
- tailrec - tailrec
@ -124,6 +136,21 @@ packages:
- tailrec - tailrec
- transformers - transformers
- unsafe-coerce - unsafe-coerce
aff-promise:
type: registry
version: 4.0.0
integrity: sha256-Kq5EupbUpXeUXx4JqGQE7/RTTz/H6idzWhsocwlEFhM=
dependencies:
- aff
- foreign
ansi:
type: registry
version: 7.0.0
integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE=
dependencies:
- foldable-traversable
- lists
- strings
arraybuffer-types: arraybuffer-types:
type: registry type: registry
version: 3.0.2 version: 3.0.2
@ -148,6 +175,17 @@ packages:
- tuples - tuples
- unfoldable - unfoldable
- unsafe-coerce - unsafe-coerce
avar:
type: registry
version: 5.0.0
integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A=
dependencies:
- aff
- effect
- either
- exceptions
- functions
- maybe
bifunctors: bifunctors:
type: registry type: registry
version: 6.0.0 version: 6.0.0
@ -158,6 +196,18 @@ packages:
- newtype - newtype
- prelude - prelude
- tuples - tuples
catenable-lists:
type: registry
version: 7.0.0
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
dependencies:
- control
- foldable-traversable
- lists
- maybe
- prelude
- tuples
- unfoldable
console: console:
type: registry type: registry
version: 6.1.0 version: 6.1.0
@ -339,6 +389,12 @@ packages:
- tuples - tuples
- typelevel-prelude - typelevel-prelude
- unfoldable - unfoldable
fork:
type: registry
version: 6.0.0
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
dependencies:
- aff
formatters: formatters:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -351,6 +407,25 @@ packages:
- parsing - parsing
- prelude - prelude
- transformers - transformers
free:
type: registry
version: 7.1.0
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
dependencies:
- catenable-lists
- control
- distributive
- either
- exists
- foldable-traversable
- invariant
- lazy
- maybe
- prelude
- tailrec
- transformers
- tuples
- unsafe-coerce
functions: functions:
type: registry type: registry
version: 6.0.0 version: 6.0.0
@ -460,6 +535,14 @@ packages:
- invariant - invariant
- newtype - newtype
- prelude - prelude
mmorph:
type: registry
version: 7.0.0
integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ=
dependencies:
- free
- functors
- transformers
newtype: newtype:
type: registry type: registry
version: 5.0.0 version: 5.0.0
@ -610,6 +693,18 @@ packages:
version: 4.0.0 version: 4.0.0
integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4= integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4=
dependencies: [] dependencies: []
pipes:
type: registry
version: 8.0.0
integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY=
dependencies:
- aff
- lists
- mmorph
- prelude
- tailrec
- transformers
- tuples
precise-datetime: precise-datetime:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -683,6 +778,38 @@ packages:
integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU= integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU=
dependencies: dependencies:
- unsafe-coerce - unsafe-coerce
spec:
type: registry
version: 7.6.0
integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I=
dependencies:
- aff
- ansi
- arrays
- avar
- bifunctors
- control
- datetime
- effect
- either
- exceptions
- foldable-traversable
- fork
- identity
- integers
- lists
- maybe
- newtype
- now
- ordered-collections
- parallel
- pipes
- prelude
- refs
- strings
- tailrec
- transformers
- tuples
st: st:
type: registry type: registry
version: 6.2.0 version: 6.2.0

View File

@ -10,6 +10,8 @@ package:
strict: true strict: true
pedanticPackages: true pedanticPackages: true
dependencies: dependencies:
- aff-promise
- profunctor
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
@ -46,5 +48,6 @@ package:
main: Test.Main main: Test.Main
dependencies: dependencies:
- console - console
- spec
workspace: workspace:
extraPackages: {} extraPackages: {}

535
src/Node.Stream.Object.js Normal file
View File

@ -0,0 +1,535 @@
import Stream from "stream";
const DEBUG = process.env['NODEJS_OBJECT_STREAM_TRACE'] !== ''
/** @type {(s: string) => void} */
const log = m => DEBUG ? console.log(m) : undefined;
let chainCount = 0
let composeCount = 0
let neverCount = 0
let onceCount = 0
let bindCount = 0
let zipCount = 0
let mapCount = 0
let constCount = 0
let fromPromiseCount = 0
export class Never extends Stream.Readable {
constructor() {
super({objectMode: true})
this.id = neverCount++
}
_read() {
log(`Never {id: ${this.id}}#_read()`)
log(` this.push(null)`)
this.push(null)
}
}
/** @template T */
export class Once extends Stream.Duplex {
/** @param {T} a */
constructor(a) {
super({objectMode: true, allowHalfOpen: false})
this.a = a
this.id = onceCount++
this.push(this.a)
this.push(null)
log(`Once {id: ${this.id}}#new()`)
log(` this.push(${a})`)
log(` this.push(null)`)
}
/** @type {Stream.Duplex['_write']} */
_write(_ck, _enc, cb) {
cb()
}
_read() {
log(`Once {id: ${this.id}}#_read()`)
}
}
/** @template T */
export class Const extends Stream.Transform {
/** @param {T} a */
constructor(a) {
super({objectMode: true})
this.a = a
this.id = constCount++
}
/** @type {Stream.Transform['_transform']} */
_transform(_c, _enc, cb) {
log(`Const {id: ${this.id}}#_transform(${_c}, _, _)`)
log(` cb(${this.a})`)
this.push(this.a)
cb()
}
}
/** @template T */
export class FromPromise extends Stream.Readable {
/** @param {Promise<T>} p */
constructor(p) {
super({objectMode: true})
this.id = fromPromiseCount++
p
.then(a => {
log(`FromPromise {id: ${this.id}}#new()`)
log(` ...p.then(...)`)
log(` this.push(${a})`)
log(` this.push(null)`)
this.push(a)
this.push(null)
})
.catch(e => {
log(`FromPromise {id: ${this.id}}#new()`)
log(` ...p.catch(...)`)
log(` this.destroy(${e})`)
this.destroy(e)
})
}
_read() {
log(`FromPromise {id: ${this.id}}#_read()`)
}
}
/** @template T */
export class Chain extends Stream.Readable {
/** @param {...Stream.Readable} streams */
constructor(...streams) {
super({objectMode: true})
this.id = chainCount++
this.ix = -1
this.streams = streams
this.next()
}
next() {
log(`Chain {id: ${this.id}}#next()`)
this.ix++
if (this.ix === this.streams.length) {
log(` this.push(null)`)
this.push(null)
} else {
const cur = this.streams[this.ix]
cur.once('error', e => {
log(`Chain {id: ${this.id}}#next()`)
log(` cur.once('error', ...)`)
log(` this.destroy(${e})`)
this.destroy(e)
})
cur.once('end', () => {
log(`Chain {id: ${this.id}}#next()`)
log(` cur.once('end', ...)`)
log(` this.next()`)
this.next()
})
cur.on('data', ck => {
log(`Chain {id: ${this.id}}#next()`)
log(` cur.on('data', ...)`)
log(` this.push(${ck})`)
const canPush = this.push(ck)
if (cur && !canPush) {
log(` cur.pause()`)
cur.pause()
}
})
}
}
_read() {
log(`Chain {id: ${this.id}}#_read()`)
this.streams.forEach(s => {
if (s.isPaused()) {
log(` s.resume()`)
s.resume()
}
})
}
}
/**
* @template T
* @template R
*/
export class Map extends Stream.Transform {
/** @param {(t: T) => R} f */
constructor(f) {
super({objectMode: true})
this.f = f
this.id = mapCount++
}
/** @type {Stream.Transform['_transform']} */
_transform(ck, _, cb) {
log(`Map {id: ${this.id}}#_transform(${ck}, _, _)`)
const r = this.f(ck)
log(` const r = (${r})`)
log(` cb(null, ${r})`)
cb(null, r)
}
}
export class Zip extends Stream.Readable {
/** @type {Array<Stream.Readable>} */
streams = []
/** @type {Array<Array<unknown | null>>} */
buf = []
/** @type {Timer | null} */
readingTimer = null
/** @type {Timer | null} */
pushingTimer = null
/** @param {...Stream.Readable} streams */
constructor(...streams) {
super({objectMode: true})
this.id = zipCount++
this.streams = streams
this.streams.forEach((s, ix) => {
s.once('error', e => this.destroy(e))
s.once('end', () => this.push(null))
s.on('data', ck => {
this.bufput(ix, ck)
s.pause()
})
})
}
/** @type {(ix: number, val: unknown) => void} */
bufput(ix, val) {
if (this.buflastdone()) {
this.bufnew()
}
const zipped = this.buflast()
zipped[ix] = val
}
bufnew() {
this.buf.push(this.streams.map(_ => null))
}
buflastdone() {
const l = this.buflast()
return l.every(a => a !== null)
}
bufheaddone() {
const l = this.bufhead()
return l.every(a => a !== null)
}
/** @returns {unknown[]} */
buflast() {
if (this.buf.length === 0) {
this.bufnew()
}
return this.buf[this.buf.length - 1]
}
/** @returns {unknown[]} */
bufhead() {
if (this.buf.length === 0) {
this.bufnew()
}
return this.buf[0]
}
anyended() {
return this.streams.some(s => s.readableEnded)
}
maybeend() {
if (!this.bufheaddone() && this.anyended()) {
this.push(null)
return true
} else {
return false
}
}
readzipped() {
if (this.anyended()) {
return false
}
if (this.buflastdone()) {
this.bufnew()
this.streams.forEach((s, ix) => {
this.buflast()[ix] = s.read()
})
return this.buflastdone()
} else {
const missingIxs = this.buflast().flatMap((a, ix) => a === null ? [ix] : [])
missingIxs.forEach(ix => {
this.buflast()[ix] = this.streams[ix].read()
})
return this.buflastdone()
}
}
_read() {
if (this.readingTimer) clearTimeout(this.readingTimer);
if (this.pushingTimer) clearTimeout(this.pushingTimer);
this.readingTimer = setTimeout(() => {
while (this.readzipped()) {
continue;
}
}, 0)
this.pushingTimer = setTimeout(() => {
let canWrite = true
if (this.maybeend()) {
return
}
while (canWrite && this.bufheaddone()) {
canWrite = this.push(this.bufhead())
this.buf.shift()
}
this.maybeend()
}, 0)
}
}
export class Compose extends Stream.Duplex {
/**
* @param {Stream.Readable | Stream.Transform} a
* @param {Stream.Transform} b
*/
constructor(a, b) {
super({objectMode: true})
this.id = composeCount++
this.a = a
this.b = b
log(`Compose {id: ${this.id}}#new()`)
log(` a.on('data', ...)`)
log(` a.once('end', ...)`)
log(` a.once('error', ...)`)
log(` a.pause()`)
log(` b.on('drain', ...)`)
log(` b.on('data', ...)`)
log(` b.on('error', ...)`)
log(` b.on('finish', ...)`)
this.a.once('end', () => {
log(`Compose {id: ${this.id}}#new()`)
log(` a.on('end', ...)`)
log(` b.end()`)
this.b.end()
})
this.a.on('data', ck => {
log(`Compose {id: ${this.id}}#new()`)
log(` a.on('data', ...)`)
log(` b.write(${ck})`)
const canWrite = this.b.write(ck)
if (!canWrite) {
log(` a.pause()`)
this.a.pause()
}
})
this.a.once('error', e => {
log(`Compose {id: ${this.id}}#new()`)
log(` a.once('error', ...)`)
log(` this.destroy(${e})`)
this.destroy(e)
this.b.destroy(e)
})
this.b.on('drain', () => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.on('drain', ...)`)
log(` this.a.resume()`)
this.a.resume()
})
this.b.on('data', ck => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.on('data', ...)`)
log(` this.push(${ck})`)
const canPush = this.push(ck)
if (!canPush) {
log(` b.pause()`)
this.b.pause()
}
})
this.b.once('error', e => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.once('error', ...)`)
log(` this.destroy(${e})`)
this.destroy(e)
this.a.destroy(e)
})
this.b.once('finish', () => {
log(`Compose {id: ${this.id}}#new()`)
log(` b.once('finish', ...)`)
log(` this.emit('finish')`)
this.push(null)
this.end()
this.emit('finish')
})
}
_read() {
log(`Compose {id: ${this.id}}#_read()`)
if (this.b.isPaused()) {
log(` b.resume()`)
this.b.resume()
}
}
/** @type {Stream.Duplex['_write']} */
_write(ck, _enc, cb) {
log(`Compose {id: ${this.id}}#_write(${ck}, _, _)`)
if (this.a instanceof Stream.Readable) {
throw new Error('Cannot `write` to a Readable stream')
}
log(` this.a.write(${ck}, _, _)`)
this.a.write(ck, _enc, cb)
}
}
/**
* @template T
*/
export class Bind extends Stream.Duplex {
/**
* @param {(t: T) => () => Stream.Readable} f
*/
constructor(f) {
super({objectMode: true, allowHalfOpen: true})
this.f = f
this.id = bindCount++
this.ix = 0
/** @type {Array<Stream.Readable>} */
this.streams = []
/** @type {(() => void) | undefined} */
this.doneCb = undefined
this.paused = true
}
/** @type {NonNullable<Stream.Duplex['_final']>} */
_final(cb) {
log(`Bind {id: ${this.id}}#_final(_)`)
this.doneCb = cb
}
init() {
log(`Bind {id: ${this.id}}#init()`)
const s = this.streams[this.ix]
if (this.paused || (!s && !this.doneCb)) {
this.paused = true
return
} else if (this.doneCb) {
log(` this.doneCb()`)
this.doneCb()
this.doneCb = undefined
return
}
log(` s.on('data', ...)`)
s.on('data', ck => {
log(`Bind {id: ${this.id}}#initcur()`)
log(` s.on('data', ...)`)
log(` this.push(${ck})`)
const canPush = this.push(ck)
if (!canPush) {
s.pause()
}
})
log(` s.once('end', ...)`)
s.once('end', () => {
log(`Bind {id: ${this.id}}#initcur()`)
log(` s.once('end', ...)`)
log(` this.ix++`)
log(` this.init()`)
this.ix++
this.init()
})
}
/** @type {Stream.Duplex['_write']} */
_write(ck, _, cb) {
log(`Bind {id: ${this.id}}#_write(${ck}, _, _)`)
try {
log(` this.streams = ${JSON.stringify(this.streams.map(_ => 'Readable'))}`)
this.streams.push(this.f(ck)())
if (this.paused) {
log(` this.init()`)
this.paused = false
this.init()
}
log(` cb()`)
cb()
} catch(e) {
log(` cb(${e})`)
// @ts-ignore
cb(e)
}
}
/** @type {Stream.Duplex['_read']} */
_read() {
log(`Bind {id: ${this.id}}#_read()`)
this.streams.forEach(s =>
s.isPaused() ? s.resume() : undefined)
}
}
/** @type {() => Stream.Readable} */
export const neverImpl = () => new Never();
/** @type {<T>(a: T) => () => Stream.Transform} */
export const constImpl = (a) => () => new Const(a);
/** @type {<T>(a: T) => () => Stream.Readable} */
export const onceImpl = (a) => () => new Once(a);
/** @type {<T>(a: () => Promise<T>) => () => Stream.Readable} */
export const fromPromiseImpl = (a) => () => new FromPromise(a())
/** @type {(ss: Array<Stream.Readable>) => () => Stream.Readable} */
export const chainImpl = (ss) => () => new Chain(...ss);
/** @type {<T, R>(f: (t: T) => R) => () => Stream.Transform} */
export const mapImpl = (f) => () => new Map(f);
/** @type {(iab: Stream.Readable) => (ia: Stream.Readable) => () => Stream.Readable} */
export const applyImpl = (iab) => (ia) => () =>
new Compose(new Zip(iab, ia), new Map(([ab, a]) => ab(a)))
/** @type {<T>(f: (t: T) => () => Stream.Readable) => () => Stream.Duplex} */
export const bindImpl = (f) => () => new Bind(f)
/** @type {(a: Stream.Transform | Stream.Readable) => (b: Stream.Transform) => () => Stream.Duplex} */
export const pipeImpl = (a) => (b) => () => new Compose(a, b)
process.on('beforeExit', () => {
debugger;
})

189
src/Node.Stream.Object.purs Normal file
View File

@ -0,0 +1,189 @@
module Node.Stream.Object where
import Prelude
import Control.Monad.Rec.Class (untilJust)
import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref as STRef
import Control.Promise (Promise)
import Control.Promise as Promise
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Either (Either(..))
import Data.Maybe (Maybe(..))
import Data.Newtype (class Newtype, unwrap, wrap)
import Data.Profunctor (class Profunctor)
import Data.Traversable (class Traversable, traverse)
import Effect (Effect)
import Effect.Aff (Aff, delay, effectCanceler, launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff)
import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (Error)
import Effect.Uncurried (mkEffectFn1)
import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..))
import Node.EventEmitter as Event
import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0)
import Node.Stream (Readable, Writable)
import Unsafe.Coerce (unsafeCoerce)
foreign import data Stream :: Type -> Type -> Type
newtype ObjectStream :: Type -> Type -> Type
newtype ObjectStream i o = ObjectStream (Effect (Stream i o))
derive instance Newtype (ObjectStream i o) _
instance Functor (ObjectStream i) where
map :: forall a b. (a -> b) -> ObjectStream i a -> ObjectStream i b
map ab (ObjectStream ia) = wrap $ join $ pure pipeImpl <*> ia <*> mapImpl ab
instance Apply (ObjectStream i) where
apply :: forall a b. ObjectStream i (a -> b) -> ObjectStream i a -> ObjectStream i b
apply (ObjectStream iab) (ObjectStream ia) = wrap $ join $ pure applyImpl <*> iab <*> ia
instance Applicative (ObjectStream i) where
pure = once
instance Monad (ObjectStream i)
instance MonadEffect (ObjectStream Unit) where
liftEffect = wrap <<< flip bind onceImpl
instance MonadAff (ObjectStream Unit) where
liftAff = wrap <<< fromPromiseImpl <<< Promise.fromAff
instance Bind (ObjectStream i) where
bind (ObjectStream sia') asb = wrap do
sia <- sia'
sab <- bindImpl (unwrap <<< asb)
pipeImpl sia sab
instance Profunctor ObjectStream where
dimap ab cd (ObjectStream sbc') = wrap do
sbc <- sbc'
sab <- mapImpl ab
scd <- mapImpl cd
sac <- pipeImpl sab sbc
pipeImpl sac scd
-- | A stream that will emit the value `a` exactly once, and ignores any written chunks.
once :: forall i a. a -> ObjectStream i a
once = wrap <<< onceImpl
-- | A stream that will immediately emit `close` and `end` events.
never :: forall a. ObjectStream Unit a
never = wrap neverImpl
-- | A stream that for all input chunks, emits `unit`.
sink :: forall a. ObjectStream a Unit
sink = pure unit
-- | Create a stream from a `Foldable` of `a`s
fromFoldable :: forall f a. Traversable f => f a -> ObjectStream Unit a
fromFoldable = chainMany <<< map once
-- | Convert a `Readable` stream emitting `Buffer` chunks to an `ObjectStream`
fromBufferReadable :: forall r. Readable r -> ObjectStream Unit Buffer
fromBufferReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit Buffer) r
-- | Convert a `Readable` stream emitting `String` chunks to an `ObjectStream`
fromStringReadable :: forall r. Readable r -> ObjectStream Unit String
fromStringReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit String) r
-- | Convert a `Writable` stream accepting `Buffer` chunks to an `ObjectStream`
fromBufferWritable :: forall r. Writable r -> ObjectStream Buffer Unit
fromBufferWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream Buffer Unit) r
-- | Convert a `Writable` stream accepting `String` chunks to an `ObjectStream`
fromStringWritable :: forall r. Writable r -> ObjectStream String Unit
fromStringWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream String Unit) r
-- | Emit chunks from the first stream, then when exhausted emit chunks from the second.
chain :: forall a. ObjectStream Unit a -> ObjectStream Unit a -> ObjectStream Unit a
chain a b = chainMany [ a, b ]
-- | `chain` for an arbitrary number of streams.
chainMany :: forall f a. Traversable f => f (ObjectStream Unit a) -> ObjectStream Unit a
chainMany as' = wrap do
as <- Array.fromFoldable <$> traverse unwrap as'
chainImpl as
run_ :: forall a. ObjectStream Unit a -> Aff Unit
run_ = void <<< run
run :: forall a. ObjectStream Unit a -> Aff (Array a)
run (ObjectStream s') = do
runningCount <- liftEffect $ ST.toEffect $ STRef.new 0
values <- liftEffect $ ST.toEffect $ Array.ST.new
s <- liftEffect s'
makeAff \res ->
let
onData a = ST.toEffect do
void $ STRef.modify (_ + 1) runningCount
void $ Array.ST.push a values
void $ STRef.modify (_ - 1) runningCount
onError e = res $ Left e
onEnd = launchAff_ do
untilJust do
delay $ wrap $ 1.0
running <- liftEffect $ ST.toEffect $ STRef.read runningCount
pure $ if running == 0 then Just unit else Nothing
values' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze values
liftEffect $ res $ Right values'
in
do
cancelData <- Event.on dataH onData s
cancelError <- Event.on errorH onError s
cancelEnd <- Event.on endH onEnd s
pure $ effectCanceler do
cancelData
cancelError
cancelEnd
-- | Constructs a Stream that re-emits the outputs from each stream, in order.
foreign import chainImpl :: forall a. Array (Stream Unit a) -> Effect (Stream Unit a)
-- | Pipes a stream's output into another's input, returning the new composite stream.
-- |
-- | Note that this differs from `Readable#pipe`, which returns the destination stream
-- | verbatim to allow chained piping of only _outputs_.
foreign import pipeImpl :: forall a b c. Stream a b -> Stream b c -> Effect (Stream a c)
-- | A readable stream that immediately closes without emitting any chunks.
foreign import neverImpl :: forall a. Effect (Stream Unit a)
-- | Constructs a readable stream from an asynchronous value.
foreign import fromPromiseImpl :: forall a. Effect (Promise a) -> Effect (Stream Unit a)
-- | Constructs a readable stream that emits a single value then closes.
foreign import onceImpl :: forall i a. a -> Effect (Stream i a)
-- | Constructs a `Transform` applying the given function to chunks.
foreign import mapImpl :: forall a b. (a -> b) -> Effect (Stream a b)
-- | Constructs a `Transform` zipping functions from the left stream with values from the right stream.
-- |
-- | Closes when either stream closes.
foreign import applyImpl :: forall i a b. Stream i (a -> b) -> Stream i a -> Effect (Stream i b)
-- | Constructs a `Transform` which applies a function to each written chunk.
-- |
-- | The values emitted by the stream returned by this function are then emitted
-- | until the temporary stream closes.
foreign import bindImpl :: forall a _x b. (a -> Effect (Stream _x b)) -> Effect (Stream a b)
dataH :: forall i o. EventHandle1 (Stream i o) o
dataH = EventHandle "data" mkEffectFn1
readableH :: forall i o. EventHandle0 (Stream i o)
readableH = EventHandle "readable" identity
closeH :: forall i o. EventHandle0 (Stream i o)
closeH = EventHandle "close" identity
endH :: forall i o. EventHandle0 (Stream i o)
endH = EventHandle "end" identity
errorH :: forall i o. EventHandle1 (Stream i o) Error
errorH = EventHandle "error" mkEffectFn1

View File

@ -2,11 +2,13 @@ module Test.Main where
import Prelude import Prelude
import Data.Maybe (Maybe(..))
import Effect (Effect) import Effect (Effect)
import Effect.Class.Console (log) import Effect.Aff (launchAff_)
import Test.Node.Stream.Object as Test.Node.Stream.Object
import Test.Spec.Reporter (consoleReporter, specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit main :: Effect Unit
main = do main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ consoleReporter ] do
log "🍕" Test.Node.Stream.Object.spec
log "You should add some tests."

View File

@ -0,0 +1,57 @@
module Test.Node.Stream.Object where
import Prelude
import Data.Newtype (wrap)
import Effect.Aff (delay)
import Effect.Aff.Class (liftAff)
import Node.Stream.Object as Stream
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
spec :: Spec Unit
spec =
describe "Node.Stream.Object" do
describe "ObjectStream" do
describe "once" do
it "emits once" do
out <- Stream.run (Stream.once 1)
out `shouldEqual` [ 1 ]
describe "never" do
it "immediately closes" do
out <- Stream.run Stream.never
out `shouldEqual` ([] :: Array Int)
describe "chain" do
it "noops" do
out <- Stream.run $ Stream.chainMany []
out `shouldEqual` ([] :: Array Int)
it "works with 1 stream" do
out <- Stream.run $ Stream.chainMany [Stream.once 1]
out `shouldEqual` [1]
it "works with 2 streams" do
out <- Stream.run $ Stream.chainMany [Stream.once 1, Stream.once 2]
out `shouldEqual` [1, 2]
it "does not emit end until last child stream ends" do
let
delayed n a = liftAff do
delay $ wrap n
pure a
out <- Stream.run $ Stream.chainMany [delayed 10.0 1, delayed 20.0 2]
out `shouldEqual` [1, 2]
describe "fromFoldable" do
it "creates an empty readable" do
out <- Stream.run (Stream.fromFoldable [] :: Stream.ObjectStream Unit (Array Int))
out `shouldEqual` []
it "creates a readable that emits each element" do
out <- Stream.run (Stream.fromFoldable [ 1, 2, 3 ])
out `shouldEqual` [ 1, 2, 3 ]
it "bind maps each number" do
out <- Stream.run do
a <- Stream.fromFoldable [ 1, 2, 3 ]
pure $ a + 1
out `shouldEqual` [ 2, 3, 4 ]
it "bind fans out" do
out <- Stream.run do
a <- Stream.fromFoldable [ 1, 2, 3 ]
Stream.fromFoldable [a * 10, a * 20]
out `shouldEqual` [ 10, 20, 20, 40, 30, 60 ]