diff --git a/package.json b/package.json index b949871..b46138c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,7 @@ { "name": "purescript-csv-stream", "version": "v1.2.17", + "type": "module", "dependencies": { "csv-parse": "^5.5.5", "csv-stringify": "^6.4.6" diff --git a/spago.lock b/spago.lock index 4543e72..8cb2538 100644 --- a/spago.lock +++ b/spago.lock @@ -4,6 +4,7 @@ workspace: path: ./ dependencies: - aff: ">=7.1.0 <8.0.0" + - aff-promise - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" - control: ">=6.0.0 <7.0.0" @@ -27,6 +28,7 @@ workspace: - ordered-collections: ">=3.2.0 <4.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" + - profunctor - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - st: ">=6.2.0 <7.0.0" @@ -37,11 +39,16 @@ workspace: - unsafe-coerce: ">=6.0.0 <7.0.0" test_dependencies: - console + - spec build_plan: - aff + - aff-promise + - ansi - arraybuffer-types - arrays + - avar - bifunctors + - catenable-lists - console - const - contravariant @@ -59,7 +66,9 @@ workspace: - foldable-traversable - foreign - foreign-object + - fork - formatters + - free - functions - functors - gen @@ -70,6 +79,7 @@ workspace: - lazy - lists - maybe + - mmorph - newtype - node-buffer - node-event-emitter @@ -83,6 +93,7 @@ workspace: - parallel - parsing - partial + - pipes - precise-datetime - prelude - profunctor @@ -90,6 +101,7 @@ workspace: - record-extra - refs - safe-coerce + - spec - st - strings - tailrec @@ -124,6 +136,21 @@ packages: - tailrec - transformers - unsafe-coerce + aff-promise: + type: registry + version: 4.0.0 + integrity: sha256-Kq5EupbUpXeUXx4JqGQE7/RTTz/H6idzWhsocwlEFhM= + dependencies: + - aff + - foreign + ansi: + type: registry + version: 7.0.0 + integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE= + dependencies: + - foldable-traversable + - lists + - strings arraybuffer-types: type: registry version: 3.0.2 @@ -148,6 +175,17 @@ packages: - tuples - unfoldable - unsafe-coerce + avar: + type: registry + version: 5.0.0 + integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A= + dependencies: + - aff + - effect + - either + - exceptions + - functions + - maybe bifunctors: type: registry version: 6.0.0 @@ -158,6 +196,18 @@ packages: - newtype - prelude - tuples + catenable-lists: + type: registry + version: 7.0.0 + integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA= + dependencies: + - control + - foldable-traversable + - lists + - maybe + - prelude + - tuples + - unfoldable console: type: registry version: 6.1.0 @@ -339,6 +389,12 @@ packages: - tuples - typelevel-prelude - unfoldable + fork: + type: registry + version: 6.0.0 + integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E= + dependencies: + - aff formatters: type: registry version: 7.0.0 @@ -351,6 +407,25 @@ packages: - parsing - prelude - transformers + free: + type: registry + version: 7.1.0 + integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k= + dependencies: + - catenable-lists + - control + - distributive + - either + - exists + - foldable-traversable + - invariant + - lazy + - maybe + - prelude + - tailrec + - transformers + - tuples + - unsafe-coerce functions: type: registry version: 6.0.0 @@ -460,6 +535,14 @@ packages: - invariant - newtype - prelude + mmorph: + type: registry + version: 7.0.0 + integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ= + dependencies: + - free + - functors + - transformers newtype: type: registry version: 5.0.0 @@ -610,6 +693,18 @@ packages: version: 4.0.0 integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4= dependencies: [] + pipes: + type: registry + version: 8.0.0 + integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY= + dependencies: + - aff + - lists + - mmorph + - prelude + - tailrec + - transformers + - tuples precise-datetime: type: registry version: 7.0.0 @@ -683,6 +778,38 @@ packages: integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU= dependencies: - unsafe-coerce + spec: + type: registry + version: 7.6.0 + integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I= + dependencies: + - aff + - ansi + - arrays + - avar + - bifunctors + - control + - datetime + - effect + - either + - exceptions + - foldable-traversable + - fork + - identity + - integers + - lists + - maybe + - newtype + - now + - ordered-collections + - parallel + - pipes + - prelude + - refs + - strings + - tailrec + - transformers + - tuples st: type: registry version: 6.2.0 diff --git a/spago.yaml b/spago.yaml index 3252a56..8cc1533 100644 --- a/spago.yaml +++ b/spago.yaml @@ -10,6 +10,8 @@ package: strict: true pedanticPackages: true dependencies: + - aff-promise + - profunctor - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" @@ -46,5 +48,6 @@ package: main: Test.Main dependencies: - console + - spec workspace: extraPackages: {} diff --git a/src/Node.Stream.Object.js b/src/Node.Stream.Object.js new file mode 100644 index 0000000..92cc22e --- /dev/null +++ b/src/Node.Stream.Object.js @@ -0,0 +1,310 @@ +import Stream from "stream"; + +export class Never extends Stream.Readable { + constructor() { + super({read: function() { + this.push(null) + }, objectMode: true}) + } +} + +/** @template T */ +export class Once extends Stream.Readable { + /** @param {T} a */ + constructor(a) { + super({read: function() { }, objectMode: true}) + this.push(a) + this.push(null) + } +} + +/** @template T */ +export class Const extends Stream.Transform { + /** @param {T} a */ + constructor(a) { + super({transform: function(_c, _enc, cb) { + cb(null, a) + }, objectMode: true}) + } +} + +/** @template T */ +export class FromPromise extends Stream.Readable { + /** @param {Promise} p */ + constructor(p) { + p + .then(a => { + this.push(a) + this.push(null) + }) + .catch(e => { + this.destroy(e) + }) + + super({read: function() {}, objectMode: true}) + } +} + +/** @template T */ +export class Chain extends Stream.Readable { + /** @param {...Stream.Readable} streams */ + constructor(...streams) { + super({objectMode: true}) + this.ix = -1 + this.streams = streams + /** @type {Stream.Readable | undefined} */ + this.cur = undefined + this.next() + } + + next() { + this.ix++ + if (this.ix === this.streams.length) { + return undefined + } else { + this.cur = this.streams[this.ix] + + this.cur.once('end', () => { + this.next() + }) + + this.cur.on('data', ck => { + const canPush = this.push(ck) + if (this.cur && !canPush) { + this.cur.pause() + } + }) + } + } + + _read() { + if (!this.cur) { + this.push(null) + } else if (this.cur.isPaused()) { + this.cur.resume() + } + } +} + +/** + * @template T + * @template R + */ +export class Map extends Stream.Transform { + /** @param {(t: T) => R} f */ + constructor(f) { + super({ + transform: function (chunk, _, cb) { + try { + this.push(f(chunk)) + cb(); + } catch (e) { + // @ts-ignore + cb(e); + } + }, + objectMode: true + }) + } +} + +export class Zip extends Stream.Readable { + /** @type {Array} */ + streams = [] + + /** @type {Array} */ + buf = [] + + /** @param {...Stream.Readable} streams */ + constructor(...streams) { + super({objectMode: true}) + this.streams = streams + this.streams.forEach((s, ix) => { + s.once('error', e => this.destroy(e)) + s.once('end', () => this.push(null)) + s.on('data', ck => { + const canPush = this.bufput(ix, ck) + if (!canPush) { + this.streams.forEach(s => s.pause()) + } + }) + }) + } + + /** @type {(ix: number, val: unknown) => boolean} */ + bufput(ix, val) { + this.buf[ix] = val + if (!this.isWaiting()) { + const canPush = this.push(this.buf) + this.bufinit() + return canPush + } else { + return true + } + } + + bufinit() { + this.buf = this.streams.map(() => null) + } + + isWaiting() { + return this.buf.some(a => a === null) + } + + _read() { + this.streams.forEach(s => { + if (s.isPaused()) { + s.resume() + } + }) + } +} + +export class Compose extends Stream.Duplex { + /** + * @param {Stream.Readable | Stream.Transform} a + * @param {Stream.Transform} b + */ + constructor(a, b) { + super({objectMode: true}) + this.a = a + this.b = b + + this.a.on('data', ck => { + const canWrite = this.b.write(ck) + if (!canWrite) { + this.a.pause() + } + }) + this.a.once('error', e => this.destroy(e)) + this.a.once('end', () => this.push(null)) + + this.b.on('drain', () => { + if (this.a.isPaused()) { + this.a.resume() + } + }) + this.b.on('data', ck => { + const canPush = this.push(ck) + if (!canPush) { + this.a.pause() + this.b.pause() + } + }) + this.b.once('error', e => this.destroy(e)) + this.b.once('end', () => this.emit('end')) + this.b.once('finish', () => this.emit('finish')) + } + + _read() { + if (this.a.isPaused()) { + this.a.resume() + } + + if (this.b.isPaused()) { + this.b.resume() + } + } + + /** @type {Stream.Duplex['_write']} */ + _write(ck, _enc, cb) { + if (this.a instanceof Stream.Readable) { + throw new Error('Cannot `write` to a Readable stream') + } + + this.a.write(ck, _enc, cb) + } +} + +/** + * @template T + */ +export class Bind extends Stream.Duplex { + /** + * @param {(t: T) => () => Stream.Readable} f + */ + constructor(f) { + super({objectMode: true}) + this.f = f + + /** @type {Stream.Readable | undefined } */ + this.cur = undefined + + /** @type {Array} */ + this.streams = [] + } + + initcur() { + if (!this.cur) { + this.cur = this.streams[0] + } + + this.cur.on('data', ck => { + const canPush = this.push(ck) + if (!canPush && this.cur) { + this.cur.pause() + } + }) + + this.cur.on('end', () => { + this.streams.shift() + if (this.streams.length > 0) { + this.cur = this.streams[0] + this.initcur() + } else { + this.cur = undefined + } + }) + } + + /** @type {Stream.Duplex['_write']} */ + _write(ck, _, cb) { + try { + this.streams.push(this.f(ck)()) + this.initcur() + cb() + } catch(e) { + // @ts-ignore + cb(e) + } + } + + /** @type {Stream.Duplex['_read']} */ + _read() { + if (this.cur && this.cur.isPaused()) { + this.cur.resume() + } + } +} + +/** @type {() => Stream.Readable} */ +export const neverImpl = () => new Never(); + +/** @type {(a: T) => () => Stream.Transform} */ +export const constImpl = (a) => () => new Const(a); + +/** @type {(a: T) => () => Stream.Readable} */ +export const onceImpl = (a) => () => new Once(a); + +/** @type {(a: () => Promise) => () => Stream.Readable} */ +export const fromPromiseImpl = (a) => () => new FromPromise(a()) + +/** @type {(ss: Array) => () => Stream.Readable} */ +export const chainImpl = (ss) => () => new Chain(...ss); + +/** @type {(f: (t: T) => R) => () => Stream.Transform} */ +export const mapImpl = (f) => () => new Map(f); + +/** @type {(iab: Stream.Readable) => (ia: Stream.Readable) => () => Stream.Readable} */ +export const applyImpl = (iab) => (ia) => () => + new Compose(new Zip(iab, ia), new Map(([ab, a]) => ab(a))) + +/** @type {(f: (t: T) => () => Stream.Readable) => () => Stream.Duplex} */ +export const bindImpl = (f) => () => new Bind(f) + +/** @type {(a: Stream.Transform | Stream.Readable) => (b: Stream.Transform) => () => Stream.Duplex} */ +export const pipeImpl = (a) => (b) => () => new Compose(a, b) + +process.on('beforeExit', () => { + debugger; +}) diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs new file mode 100644 index 0000000..afa40fa --- /dev/null +++ b/src/Node.Stream.Object.purs @@ -0,0 +1,192 @@ +module Node.Stream.Object where + +import Prelude + +import Control.Monad.Rec.Class (untilJust) +import Control.Monad.ST.Global as ST +import Control.Monad.ST.Ref as STRef +import Control.Promise (Promise) +import Control.Promise as Promise +import Data.Array as Array +import Data.Array.ST as Array.ST +import Data.Either (Either(..)) +import Data.Maybe (Maybe(..)) +import Data.Newtype (class Newtype, unwrap, wrap) +import Data.Profunctor (class Profunctor) +import Data.Traversable (class Traversable, traverse) +import Effect (Effect) +import Effect.Aff (Aff, delay, effectCanceler, launchAff_, makeAff) +import Effect.Aff.Class (class MonadAff) +import Effect.Class (class MonadEffect, liftEffect) +import Effect.Exception (Error) +import Effect.Uncurried (mkEffectFn1) +import Node.Buffer (Buffer) +import Node.EventEmitter (EventHandle(..)) +import Node.EventEmitter as Event +import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0) +import Node.Stream (Readable, Writable) +import Unsafe.Coerce (unsafeCoerce) + +foreign import data Stream :: Type -> Type -> Type + +newtype ObjectStream :: Type -> Type -> Type +newtype ObjectStream i o = ObjectStream (Effect (Stream i o)) + +derive instance Newtype (ObjectStream i o) _ + +instance Functor (ObjectStream i) where + map :: forall a b. (a -> b) -> ObjectStream i a -> ObjectStream i b + map ab (ObjectStream ia) = wrap $ join $ pure pipeImpl <*> ia <*> mapImpl ab + +instance Apply (ObjectStream i) where + apply :: forall a b. ObjectStream i (a -> b) -> ObjectStream i a -> ObjectStream i b + apply (ObjectStream iab) (ObjectStream ia) = wrap $ join $ pure applyImpl <*> iab <*> ia + +instance Applicative (ObjectStream i) where + pure = wrap <<< constImpl + +instance Monad (ObjectStream i) + +instance MonadEffect (ObjectStream Unit) where + liftEffect = wrap <<< flip bind onceImpl + +instance MonadAff (ObjectStream Unit) where + liftAff = wrap <<< fromPromiseImpl <<< Promise.fromAff + +instance Bind (ObjectStream i) where + bind (ObjectStream sia') asb = wrap do + sia <- sia' + sab <- bindImpl (unwrap <<< asb) + pipeImpl sia sab + +instance Profunctor ObjectStream where + dimap ab cd (ObjectStream sbc') = wrap do + sbc <- sbc' + sab <- mapImpl ab + scd <- mapImpl cd + sac <- pipeImpl sab sbc + pipeImpl sac scd + +-- | A stream that will emit the value `a` exactly once. +once :: forall a. a -> ObjectStream Unit a +once = wrap <<< onceImpl + +-- | A stream that will immediately emit `close` and `end` events. +never :: forall a. ObjectStream Unit a +never = wrap neverImpl + +-- | A stream that for all input chunks, emits `unit`. +sink :: forall a. ObjectStream a Unit +sink = pure unit + +-- | Create a stream from a `Foldable` of `a`s +fromFoldable :: forall f a. Traversable f => f a -> ObjectStream Unit a +fromFoldable = chainMany <<< map once + +-- | Convert a `Readable` stream emitting `Buffer` chunks to an `ObjectStream` +fromBufferReadable :: forall r. Readable r -> ObjectStream Unit Buffer +fromBufferReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit Buffer) r + +-- | Convert a `Readable` stream emitting `String` chunks to an `ObjectStream` +fromStringReadable :: forall r. Readable r -> ObjectStream Unit String +fromStringReadable r = wrap $ pure $ (unsafeCoerce :: Readable r -> Stream Unit String) r + +-- | Convert a `Writable` stream accepting `Buffer` chunks to an `ObjectStream` +fromBufferWritable :: forall r. Writable r -> ObjectStream Buffer Unit +fromBufferWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream Buffer Unit) r + +-- | Convert a `Writable` stream accepting `String` chunks to an `ObjectStream` +fromStringWritable :: forall r. Writable r -> ObjectStream String Unit +fromStringWritable r = wrap $ pure $ (unsafeCoerce :: Writable r -> Stream String Unit) r + +-- | Emit chunks from the first stream, then when exhausted emit chunks from the second. +chain :: forall a. ObjectStream Unit a -> ObjectStream Unit a -> ObjectStream Unit a +chain a b = chainMany [ a, b ] + +-- | `chain` for an arbitrary number of streams. +chainMany :: forall f a. Traversable f => f (ObjectStream Unit a) -> ObjectStream Unit a +chainMany as' = wrap do + as <- Array.fromFoldable <$> traverse unwrap as' + chainImpl as + +run_ :: forall a. ObjectStream Unit a -> Aff Unit +run_ = void <<< run + +run :: forall a. ObjectStream Unit a -> Aff (Array a) +run (ObjectStream s') = do + runningCount <- liftEffect $ ST.toEffect $ STRef.new 0 + values <- liftEffect $ ST.toEffect $ Array.ST.new + s <- liftEffect s' + makeAff \res -> + let + onData a = ST.toEffect do + void $ STRef.modify (_ + 1) runningCount + void $ Array.ST.push a values + void $ STRef.modify (_ - 1) runningCount + onError e = res $ Left e + onEnd = launchAff_ do + untilJust do + delay $ wrap $ 1.0 + running <- liftEffect $ ST.toEffect $ STRef.read runningCount + pure $ if running == 0 then Just unit else Nothing + values' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze values + liftEffect $ res $ Right values' + in + do + cancelData <- Event.on dataH onData s + cancelError <- Event.on errorH onError s + cancelEnd <- Event.on endH onEnd s + pure $ effectCanceler do + cancelData + cancelError + cancelEnd + +-- | Constructs a `Transform` stream that always invokes the callback with the provided value. +foreign import constImpl :: forall i a. a -> Effect (Stream i a) + +-- | Constructs a Stream that re-emits the outputs from each stream, in order. +foreign import chainImpl :: forall a. Array (Stream Unit a) -> Effect (Stream Unit a) + +-- | Pipes a stream's output into another's input, returning the new composite stream. +-- | +-- | Note that this differs from `Readable#pipe`, which returns the destination stream +-- | verbatim to allow chained piping of only _outputs_. +foreign import pipeImpl :: forall a b c. Stream a b -> Stream b c -> Effect (Stream a c) + +-- | A readable stream that immediately closes without emitting any chunks. +foreign import neverImpl :: forall a. Effect (Stream Unit a) + +-- | Constructs a readable stream from an asynchronous value. +foreign import fromPromiseImpl :: forall a. Effect (Promise a) -> Effect (Stream Unit a) + +-- | Constructs a readable stream that emits a single value then closes. +foreign import onceImpl :: forall i a. a -> Effect (Stream i a) + +-- | Constructs a `Transform` applying the given function to chunks. +foreign import mapImpl :: forall a b. (a -> b) -> Effect (Stream a b) + +-- | Constructs a `Transform` zipping functions from the left stream with values from the right stream. +-- | +-- | Closes when either stream closes. +foreign import applyImpl :: forall i a b. Stream i (a -> b) -> Stream i a -> Effect (Stream i b) + +-- | Constructs a `Transform` which applies a function to each written chunk. +-- | +-- | The values emitted by the stream returned by this function are then emitted +-- | until the temporary stream closes. +foreign import bindImpl :: forall a _x b. (a -> Effect (Stream _x b)) -> Effect (Stream a b) + +dataH :: forall i o. EventHandle1 (Stream i o) o +dataH = EventHandle "data" mkEffectFn1 + +readableH :: forall i o. EventHandle0 (Stream i o) +readableH = EventHandle "readable" identity + +closeH :: forall i o. EventHandle0 (Stream i o) +closeH = EventHandle "close" identity + +endH :: forall i o. EventHandle0 (Stream i o) +endH = EventHandle "end" identity + +errorH :: forall i o. EventHandle1 (Stream i o) Error +errorH = EventHandle "error" mkEffectFn1 diff --git a/test/Test/Main.purs b/test/Test/Main.purs index e616930..e96c1ed 100644 --- a/test/Test/Main.purs +++ b/test/Test/Main.purs @@ -2,11 +2,13 @@ module Test.Main where import Prelude +import Data.Maybe (Maybe(..)) import Effect (Effect) -import Effect.Class.Console (log) +import Effect.Aff (launchAff_) +import Test.Node.Stream.Object as Test.Node.Stream.Object +import Test.Spec.Reporter (consoleReporter, specReporter) +import Test.Spec.Runner (defaultConfig, runSpec') main :: Effect Unit -main = do - log "🍕" - log "You should add some tests." - +main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ consoleReporter ] do + Test.Node.Stream.Object.spec diff --git a/test/Test/Node.Stream.Object.purs b/test/Test/Node.Stream.Object.purs new file mode 100644 index 0000000..acbeaed --- /dev/null +++ b/test/Test/Node.Stream.Object.purs @@ -0,0 +1,52 @@ +module Test.Node.Stream.Object where + +import Prelude + +import Data.Newtype (wrap) +import Effect.Aff (delay) +import Effect.Aff.Class (liftAff) +import Node.Stream.Object as Stream +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (shouldEqual) + +spec :: Spec Unit +spec = + describe "Node.Stream.Object" do + describe "ObjectStream" do + describe "once" do + it "emits once" do + out <- Stream.run (Stream.once 1) + out `shouldEqual` [ 1 ] + describe "never" do + it "immediately closes" do + out <- Stream.run Stream.never + out `shouldEqual` ([] :: Array Int) + describe "chain" do + it "noops" do + out <- Stream.run $ Stream.chainMany [] + out `shouldEqual` ([] :: Array Int) + it "works with 1 stream" do + out <- Stream.run $ Stream.chainMany [Stream.once 1] + out `shouldEqual` [1] + it "works with 2 streams" do + out <- Stream.run $ Stream.chainMany [Stream.once 1, Stream.once 2] + out `shouldEqual` [1, 2] + it "does not emit end until last child stream ends" do + let + delayed n a = liftAff do + delay $ wrap n + pure a + out <- Stream.run $ Stream.chainMany [delayed 10.0 1, delayed 20.0 2] + out `shouldEqual` [1, 2] + describe "fromFoldable" do + it "creates an empty readable" do + out <- Stream.run (Stream.fromFoldable [] :: Stream.ObjectStream Unit (Array Int)) + out `shouldEqual` [] + it "creates a readable that emits each element" do + out <- Stream.run (Stream.fromFoldable [ 1, 2, 3 ]) + out `shouldEqual` [ 1, 2, 3 ] + it "bind fans out" do + out <- Stream.run do + a <- Stream.fromFoldable [ 1, 2, 3 ] + Stream.fromFoldable [a * 10, a * 20] + out `shouldEqual` [ 10, 20, 20, 40, 30, 60 ]