feat: rework API, use node-stream-pipes

This commit is contained in:
orion 2024-05-10 18:40:36 -05:00
parent d355d3b91c
commit d8b0039678
Signed by: orion
GPG Key ID: 6D4165AE4C928719
11 changed files with 541 additions and 263 deletions

2
.gitignore vendored
View File

@ -1,4 +1,3 @@
bower_components/ bower_components/
node_modules/ node_modules/
.pulp-cache/ .pulp-cache/
@ -10,3 +9,4 @@ generated-docs/
.purs* .purs*
.psa* .psa*
.spago .spago
.tmp/

BIN
bun.lockb

Binary file not shown.

View File

@ -1,9 +1,11 @@
{ {
"name": "purescript-csv-stream", "name": "purescript-csv-stream",
"version": "v1.2.19", "version": "v1.2.19",
"type": "module",
"dependencies": { "dependencies": {
"csv-parse": "^5.5.5", "csv-parse": "^5.5.5",
"csv-stringify": "^6.4.6" "csv-stringify": "^6.4.6",
"decimal.js": "^10.4.3"
}, },
"devDependencies": { "devDependencies": {
"typescript": "^5.4.5" "typescript": "^5.4.5"

View File

@ -6,13 +6,10 @@ workspace:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0" - datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- filterable: ">=5.0.0 <6.0.0" - foldable-traversable
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- integers: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0"
@ -21,27 +18,39 @@ workspace:
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0" - record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0" - st
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples
- typelevel-prelude: ">=7.0.0 <8.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies: test_dependencies:
- console - console
- gen
- node-fs
- node-zlib
- quickcheck
- simple-json
- spec
build_plan: build_plan:
- aff - aff
- ansi
- arraybuffer-types - arraybuffer-types
- arrays - arrays
- avar
- bifunctors - bifunctors
- catenable-lists
- console - console
- const - const
- contravariant - contravariant
@ -54,12 +63,13 @@ workspace:
- enums - enums
- exceptions - exceptions
- exists - exists
- filterable
- fixed-points - fixed-points
- foldable-traversable - foldable-traversable
- foreign - foreign
- foreign-object - foreign-object
- fork
- formatters - formatters
- free
- functions - functions
- functors - functors
- gen - gen
@ -68,12 +78,18 @@ workspace:
- invariant - invariant
- js-date - js-date
- lazy - lazy
- lcg
- lists - lists
- maybe - maybe
- mmorph
- newtype - newtype
- node-buffer - node-buffer
- node-event-emitter - node-event-emitter
- node-fs
- node-path
- node-stream-pipes
- node-streams - node-streams
- node-zlib
- nonempty - nonempty
- now - now
- nullable - nullable
@ -83,13 +99,18 @@ workspace:
- parallel - parallel
- parsing - parsing
- partial - partial
- pipes
- precise-datetime - precise-datetime
- prelude - prelude
- profunctor - profunctor
- quickcheck
- random
- record - record
- record-extra - record-extra
- refs - refs
- safe-coerce - safe-coerce
- simple-json
- spec
- st - st
- strings - strings
- tailrec - tailrec
@ -100,7 +121,11 @@ workspace:
- unfoldable - unfoldable
- unicode - unicode
- unsafe-coerce - unsafe-coerce
extra_packages: {} - variant
extra_packages:
node-stream-pipes:
git: https://git.orionkindel.com/orion/purescript-node-stream-pipes
ref: v1.0.5
packages: packages:
aff: aff:
type: registry type: registry
@ -124,6 +149,14 @@ packages:
- tailrec - tailrec
- transformers - transformers
- unsafe-coerce - unsafe-coerce
ansi:
type: registry
version: 7.0.0
integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE=
dependencies:
- foldable-traversable
- lists
- strings
arraybuffer-types: arraybuffer-types:
type: registry type: registry
version: 3.0.2 version: 3.0.2
@ -148,6 +181,17 @@ packages:
- tuples - tuples
- unfoldable - unfoldable
- unsafe-coerce - unsafe-coerce
avar:
type: registry
version: 5.0.0
integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A=
dependencies:
- aff
- effect
- either
- exceptions
- functions
- maybe
bifunctors: bifunctors:
type: registry type: registry
version: 6.0.0 version: 6.0.0
@ -158,6 +202,18 @@ packages:
- newtype - newtype
- prelude - prelude
- tuples - tuples
catenable-lists:
type: registry
version: 7.0.0
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
dependencies:
- control
- foldable-traversable
- lists
- maybe
- prelude
- tuples
- unfoldable
console: console:
type: registry type: registry
version: 6.1.0 version: 6.1.0
@ -272,17 +328,6 @@ packages:
integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8= integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8=
dependencies: dependencies:
- unsafe-coerce - unsafe-coerce
filterable:
type: registry
version: 5.0.0
integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng=
dependencies:
- arrays
- either
- foldable-traversable
- identity
- lists
- ordered-collections
fixed-points: fixed-points:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -339,6 +384,12 @@ packages:
- tuples - tuples
- typelevel-prelude - typelevel-prelude
- unfoldable - unfoldable
fork:
type: registry
version: 6.0.0
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
dependencies:
- aff
formatters: formatters:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -351,6 +402,25 @@ packages:
- parsing - parsing
- prelude - prelude
- transformers - transformers
free:
type: registry
version: 7.1.0
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
dependencies:
- catenable-lists
- control
- distributive
- either
- exists
- foldable-traversable
- invariant
- lazy
- maybe
- prelude
- tailrec
- transformers
- tuples
- unsafe-coerce
functions: functions:
type: registry type: registry
version: 6.0.0 version: 6.0.0
@ -434,6 +504,17 @@ packages:
- foldable-traversable - foldable-traversable
- invariant - invariant
- prelude - prelude
lcg:
type: registry
version: 4.0.0
integrity: sha256-h7ME5cthLfbgJOJdsZcSfFpwXsx4rf8YmhebU+3iSYg=
dependencies:
- effect
- integers
- maybe
- partial
- prelude
- random
lists: lists:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -460,6 +541,14 @@ packages:
- invariant - invariant
- newtype - newtype
- prelude - prelude
mmorph:
type: registry
version: 7.0.0
integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ=
dependencies:
- free
- functors
- transformers
newtype: newtype:
type: registry type: registry
version: 5.0.0 version: 5.0.0
@ -490,6 +579,62 @@ packages:
- nullable - nullable
- prelude - prelude
- unsafe-coerce - unsafe-coerce
node-fs:
type: registry
version: 9.1.0
integrity: sha256-TzhvGdrwcM0bazDvrWSqh+M/H8GKYf1Na6aGm2Qg4+c=
dependencies:
- datetime
- effect
- either
- enums
- exceptions
- functions
- integers
- js-date
- maybe
- node-buffer
- node-path
- node-streams
- nullable
- partial
- prelude
- strings
- unsafe-coerce
node-path:
type: registry
version: 5.0.0
integrity: sha256-pd82nQ+2l5UThzaxPdKttgDt7xlsgIDLpPG0yxDEdyE=
dependencies:
- effect
node-stream-pipes:
type: git
url: https://git.orionkindel.com/orion/purescript-node-stream-pipes
rev: f2f18c3c13ae2f0f5787ccfb3832fc8c653e83ad
dependencies:
- aff
- arrays
- effect
- either
- exceptions
- foldable-traversable
- maybe
- mmorph
- newtype
- node-buffer
- node-event-emitter
- node-fs
- node-path
- node-streams
- node-zlib
- parallel
- pipes
- prelude
- st
- strings
- tailrec
- transformers
- unsafe-coerce
node-streams: node-streams:
type: registry type: registry
version: 9.0.0 version: 9.0.0
@ -503,6 +648,19 @@ packages:
- node-event-emitter - node-event-emitter
- nullable - nullable
- prelude - prelude
node-zlib:
type: registry
version: 0.4.0
integrity: sha256-kYSajFQFzWVg71l5/y4w4kXdTr5EJoqyV3D2RqmAjQ4=
dependencies:
- aff
- effect
- either
- functions
- node-buffer
- node-streams
- prelude
- unsafe-coerce
nonempty: nonempty:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -610,6 +768,18 @@ packages:
version: 4.0.0 version: 4.0.0
integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4= integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4=
dependencies: [] dependencies: []
pipes:
type: registry
version: 8.0.0
integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY=
dependencies:
- aff
- lists
- mmorph
- prelude
- tailrec
- transformers
- tuples
precise-datetime: precise-datetime:
type: registry type: registry
version: 7.0.0 version: 7.0.0
@ -650,6 +820,45 @@ packages:
- newtype - newtype
- prelude - prelude
- tuples - tuples
quickcheck:
type: registry
version: 8.0.1
integrity: sha256-ZvpccKQCvgslTXZCNmpYW4bUsFzhZd/kQUr2WmxFTGY=
dependencies:
- arrays
- console
- control
- effect
- either
- enums
- exceptions
- foldable-traversable
- gen
- identity
- integers
- lazy
- lcg
- lists
- maybe
- newtype
- nonempty
- numbers
- partial
- prelude
- record
- st
- strings
- tailrec
- transformers
- tuples
- unfoldable
random:
type: registry
version: 6.0.0
integrity: sha256-CJ611a35MPCE7XQMp0rdC6MCn76znlhisiCRgboAG+Q=
dependencies:
- effect
- integers
record: record:
type: registry type: registry
version: 4.0.0 version: 4.0.0
@ -683,6 +892,52 @@ packages:
integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU= integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU=
dependencies: dependencies:
- unsafe-coerce - unsafe-coerce
simple-json:
type: registry
version: 9.0.0
integrity: sha256-K3RJaThqsszTd+TEklzZmAdDqvIHWgXIfKqlsoykU1c=
dependencies:
- arrays
- exceptions
- foreign
- foreign-object
- nullable
- prelude
- record
- typelevel-prelude
- variant
spec:
type: registry
version: 7.6.0
integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I=
dependencies:
- aff
- ansi
- arrays
- avar
- bifunctors
- control
- datetime
- effect
- either
- exceptions
- foldable-traversable
- fork
- identity
- integers
- lists
- maybe
- newtype
- now
- ordered-collections
- parallel
- pipes
- prelude
- refs
- strings
- tailrec
- transformers
- tuples
st: st:
type: registry type: registry
version: 6.2.0 version: 6.2.0
@ -788,3 +1043,16 @@ packages:
version: 6.0.0 version: 6.0.0
integrity: sha256-IqIYW4Vkevn8sI+6aUwRGvd87tVL36BBeOr0cGAE7t0= integrity: sha256-IqIYW4Vkevn8sI+6aUwRGvd87tVL36BBeOr0cGAE7t0=
dependencies: [] dependencies: []
variant:
type: registry
version: 8.0.0
integrity: sha256-SR//zQDg2dnbB8ZHslcxieUkCeNlbMToapvmh9onTtw=
dependencies:
- enums
- lists
- maybe
- partial
- prelude
- record
- tuples
- unsafe-coerce

View File

@ -10,16 +10,16 @@ package:
strict: true strict: true
pedanticPackages: true pedanticPackages: true
dependencies: dependencies:
- foldable-traversable
- node-stream-pipes
- st
- tuples
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
- control: ">=6.0.0 <7.0.0"
- datetime: ">=6.1.0 <7.0.0" - datetime: ">=6.1.0 <7.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- filterable: ">=5.0.0 <6.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- integers: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0"
@ -32,11 +32,11 @@ package:
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0" - record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
@ -46,5 +46,14 @@ package:
main: Test.Main main: Test.Main
dependencies: dependencies:
- console - console
- gen
- node-fs
- node-zlib
- quickcheck
- simple-json
- spec
workspace: workspace:
extraPackages: {} extraPackages:
node-stream-pipes:
git: 'https://git.orionkindel.com/orion/purescript-node-stream-pipes'
ref: 'v1.0.5'

View File

@ -1,30 +1,7 @@
import { parse, Parser } from "csv-parse"; import { Parser } from "csv-parse";
class ParserWithColumns extends Parser { /** @type {(s: import('csv-parse').Options) => () => Parser} */
/** @type {Array<string>} */ export const makeImpl = (c) => () => new Parser(c);
columns = [];
/** @type {Map<string, number> | null} */
columnsMap = null;
}
/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */ /** @type {(s: Parser) => () => Array<string> | null} */
export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c);
parser.once("readable", () => {
parser.columns = parser.read();
parser.emit("columns", parser.columns);
});
return parser;
};
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */
export const readImpl = (p) => () => p.read(); export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@ -2,45 +2,18 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (untilJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, readCSVRecord)
import Data.Either (Either(..))
import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable) import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Canceler(..), delay, launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1) import Effect.Uncurried (mkEffectFn1)
import Foreign (Foreign, unsafeToForeign) import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object) import Foreign.Object (Object)
import Foreign.Object as Object import Foreign.Object (union) as Object
import Node.Encoding (Encoding(..)) import Node.Buffer (Buffer)
import Node.EventEmitter (EventHandle(..)) import Node.EventEmitter (EventHandle(..))
import Node.EventEmitter as Event
import Node.EventEmitter.UtilTypes (EventHandle1) import Node.EventEmitter.UtilTypes (EventHandle1)
import Node.Stream (Read, Stream, Write) import Node.Stream (Read, Stream, Write)
import Node.Stream as Stream import Node.Stream.Object (Transform) as Object
import Prim.Row (class Union) import Prim.Row (class Union)
import Prim.RowList (class RowToList)
import Unsafe.Coerce (unsafeCoerce) import Unsafe.Coerce (unsafeCoerce)
data CSVRead data CSVRead
@ -49,12 +22,9 @@ data CSVRead
-- | into parsed purescript objects. -- | into parsed purescript objects.
-- | -- |
-- | The CSV contents may be piped into this stream -- | The CSV contents may be piped into this stream
-- | as Buffer or String encoded chunks. -- | as Buffer or String chunks.
-- | type CSVParser :: Row Type -> Type
-- | Records can be read with `read` when `Node.Stream.readable` type CSVParser r = Stream (read :: Read, write :: Write, csv :: CSVRead | r)
-- | is true.
type CSVParser :: Row Type -> Row Type -> Type
type CSVParser a r = Stream (read :: Read, write :: Write, csv :: CSVRead | r)
-- | https://csv.js.org/parse/options/ -- | https://csv.js.org/parse/options/
type Config r = type Config r =
@ -86,136 +56,22 @@ type Config r =
) )
-- | Create a CSVParser -- | Create a CSVParser
make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ()) make :: forall @config @missing @extra. Union config missing (Config extra) => { | config } -> Effect (CSVParser ())
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string toObjectStream :: CSVParser () -> Object.Transform Buffer (Array String)
parse toObjectStream = unsafeCoerce
:: forall @r rl @config missing extra m
. MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv
liftEffect $ Stream.end stream
readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach
:: forall @r rl x m
. MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> Effect Unit)
-> m Unit
foreach stream cb = do
alreadyHaveCols <- liftEffect $ getOrInitColumnsMap stream
when (isNothing alreadyHaveCols)
$ liftAff
$ makeAff \res -> do
stop <- flip (Event.once columnsH) stream $ const do
void $ getOrInitColumnsMap stream
res $ Right unit
pure $ Canceler $ const $ liftEffect stop
liftAff $ makeAff \res -> do
count <- ST.toEffect $ STRef.new 0
removeDataListener <- flip (Event.on dataH) stream \row ->
void
$ flip catchError (res <<< Left)
$ do
void $ ST.toEffect $ STRef.modify (_ + 1) count
cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
flip catchError (liftEffect <<< res <<< Left) (cb record)
void $ ST.toEffect $ STRef.modify (_ - 1) count
removeErrorListener <- flip (Event.once Stream.errorH) stream (res <<< Left)
removeEndListener <- flip (Event.once Stream.endH) stream $ launchAff_ do
untilJust do
delay $ wrap 10.0
ct <- liftEffect $ ST.toEffect $ STRef.read count
pure $ if ct <= 1 then Just unit else Nothing
liftEffect $ res $ Right unit
pure $ Canceler $ const $ liftEffect do
removeDataListener
removeEndListener
removeErrorListener
-- | Reads a parsed record from the stream.
-- |
-- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`)
read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array
readAll
:: forall @r rl a m
. MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ liftST $ Array.ST.new
foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed. -- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String) dataH :: forall a. EventHandle1 (CSVParser a) (Array String)
dataH = EventHandle "data" mkEffectFn1 dataH = EventHandle "data" mkEffectFn1
-- | `columns` event. Emitted when the header row has been parsed.
columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
columnsH = EventHandle "columns" mkEffectFn1
-- | FFI -- | FFI
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r) foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)
-- | FFI -- | FFI
foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String)) foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String))
-- | FFI
foreign import columnsArrayImpl :: forall r. Stream r -> Effect (Array String)
-- | FFI
foreign import columnsMapImpl :: forall r. Stream r -> Effect (Nullable (Map String Int))
-- | FFI
foreign import setColumnsMapImpl :: forall r. Stream r -> Map String Int -> Effect Unit
-- | FFI
getOrInitColumnsMap :: forall r x. CSVParser r x -> Effect (Maybe (Map String Int))
getOrInitColumnsMap s = runMaybeT do
cols :: Array String <- MaybeT $ filter (not <<< Array.null) <$> Just <$> columnsArrayImpl s
let
get = MaybeT $ Nullable.toMaybe <$> columnsMapImpl s
init = do
let
ixs = Array.range 0 (Array.length cols - 1)
assoc = Array.zip cols ixs
map = Map.fromFoldable assoc
lift $ setColumnsMapImpl s map
pure map
get <|> init
-- | FFI -- | FFI
recordToForeign :: forall r. Record r -> Object Foreign recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce recordToForeign = unsafeCoerce

View File

@ -2,30 +2,16 @@ module Node.Stream.CSV.Stringify where
import Prelude import Prelude
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.CSV.Record (class WriteCSVRecord, writeCSVRecord) import Data.CSV.Record (class WriteCSVRecord, writeCSVRecord)
import Data.Either (Either(..), blush)
import Data.Foldable (class Foldable, fold)
import Data.Maybe (Maybe(..))
import Data.String.Regex (Regex) import Data.String.Regex (Regex)
import Data.Traversable (for_)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Foreign (Foreign, unsafeToForeign) import Foreign (Foreign, unsafeToForeign)
import Foreign.Object (Object) import Foreign.Object (Object)
import Foreign.Object as Object import Foreign.Object (union) as Object
import Node.EventEmitter as Event
import Node.Stream (Read, Stream, Write) import Node.Stream (Read, Stream, Write)
import Node.Stream as Stream import Node.Stream.Object (Transform) as Object
import Prim.Row (class Union) import Prim.Row (class Union)
import Prim.RowList (class RowToList) import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..))
import Unsafe.Coerce (unsafeCoerce) import Unsafe.Coerce (unsafeCoerce)
data CSVWrite data CSVWrite
@ -38,8 +24,8 @@ data CSVWrite
-- | Stringified rows are emitted on the `Readable` end as string -- | Stringified rows are emitted on the `Readable` end as string
-- | chunks, meaning it can be treated as a `Node.Stream.Readable` -- | chunks, meaning it can be treated as a `Node.Stream.Readable`
-- | that has had `setEncoding UTF8` invoked on it. -- | that has had `setEncoding UTF8` invoked on it.
type CSVStringifier :: Row Type -> Row Type -> Type type CSVStringifier :: Row Type -> Type
type CSVStringifier a r = Stream (read :: Read, write :: Write, csv :: CSVWrite | r) type CSVStringifier r = Stream (read :: Read, write :: Write, csv :: CSVWrite | r)
-- | https://csv.js.org/stringify/options/ -- | https://csv.js.org/stringify/options/
type Config r = type Config r =
@ -48,7 +34,6 @@ type Config r =
, record_delimiter :: String , record_delimiter :: String
, escape :: String , escape :: String
, escape_formulas :: Boolean , escape_formulas :: Boolean
, header :: Boolean
, quote :: String , quote :: String
, quoted :: Boolean , quoted :: Boolean
, quoted_empty :: Boolean , quoted_empty :: Boolean
@ -63,43 +48,36 @@ foreign import writeImpl :: forall r. Stream r -> Array String -> Effect Unit
recordToForeign :: forall r. Record r -> Object Foreign recordToForeign :: forall r. Record r -> Object Foreign
recordToForeign = unsafeCoerce recordToForeign = unsafeCoerce
-- | Create a CSVStringifier -- | Create a raw Transform stream that accepts chunks of `Array String`,
make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ()) -- | and transforms them into string CSV rows.
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign -- |
-- | Requires an ordered array of column names.
make
:: forall @config @missing @extra
. Union config missing (Config extra)
=> Array String
-> { | config }
-> Effect (CSVStringifier ())
make columns =
makeImpl
<<< unsafeToForeign
<<< Object.union (recordToForeign { columns, header: true })
<<< recordToForeign
-- | Synchronously stringify a collection of records -- | Convert the raw stream to a typed ObjectStream
stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String toObjectStream :: CSVStringifier () -> Object.Transform (Array String) String
stringify config records = do toObjectStream = unsafeCoerce
stream <- liftEffect $ make @r @config @missing @extra config
liftEffect $ for_ records \r -> write stream r
liftEffect $ Stream.end stream
readAll stream
-- | Write a record to a CSVStringifier. -- | Write a record to a CSVStringifier.
-- | -- |
-- | The record will be emitted on the `Readable` end -- | The record will be emitted on the `Readable` end
-- | of the stream as a string chunk. -- | of the stream as a string chunk.
write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier r a -> { | r } -> Effect Unit write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier a -> { | r } -> Effect Unit
write s = writeImpl s <<< writeCSVRecord @r @rl write s = writeImpl s <<< writeCSVRecord @r @rl
-- | Loop until the stream is closed, invoking the callback with each chunk of stringified CSV text. -- | Write a record to a CSVStringifier.
foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit -- |
foreach stream cb = whileJust do -- | The record will be emitted on the `Readable` end
isReadable <- liftEffect $ Stream.readable stream -- | of the stream as a string chunk.
liftAff $ when (not isReadable) $ makeAff \res -> do writeRaw :: forall a. CSVStringifier a -> Array String -> Effect Unit
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit writeRaw = writeImpl
pure $ Canceler $ const $ liftEffect stop
whileJust do
s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream
for_ s cb
pure $ void s
isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit
-- | Read the stringified chunks until end-of-stream, returning the entire CSV string.
readAll :: forall r a m. MonadAff m => MonadRec m => CSVStringifier r a -> m String
readAll stream = do
chunks <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push chunks
chunks' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze chunks
pure $ fold chunks'

98
src/Pipes.CSV.purs Normal file
View File

@ -0,0 +1,98 @@
module Pipes.CSV where
import Prelude
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Rec.Class (forever)
import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref as STRef
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord)
import Data.FunctorWithIndex (mapWithIndex)
import Data.Map as Map
import Data.Maybe (Maybe(..))
import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Node.Buffer (Buffer)
import Node.Stream.CSV.Parse as CSV.Parse
import Node.Stream.CSV.Stringify as CSV.Stringify
import Pipes (await, yield, (>->))
import Pipes.Core (Pipe)
import Pipes.Node.Stream as Pipes.Stream
import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..))
-- | Transforms buffer chunks of a CSV file to parsed
-- | records of `r`.
-- |
-- | ```
-- | -- == my-data.csv.gz ==
-- | -- id,foo,is_deleted
-- | -- 1,hi,f
-- | -- 2,bye,t
-- |
-- | rows
-- | :: Array {id :: Int, foo :: String, is_deleted :: Boolean}
-- | <- map Array.fromFoldable
-- | $ Pipes.toListM
-- | $ Pipes.Node.Stream.unEOS
-- | $ Pipes.Node.FS.read "my-data.csv.gz"
-- | >-> Pipes.Node.Zlib.gunzip
-- | >-> Pipes.CSV.parse
-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}]
-- | ```
parse
:: forall @r rl
. RowToList r rl
=> ReadCSVRecord r rl
=> Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit
parse = do
raw <- liftEffect $ CSV.Parse.make {}
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
let
readCols = liftEffect $ ST.toEffect $ STRef.read colsST
putCols a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) colsST
parse' a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a
firstRow a = putCols $ Map.fromFoldable $ mapWithIndex (flip (/\)) a
row a cols' = yield =<< parse' a cols'
unmarshal = forever do
r <- await
cols <- readCols
case cols of
Just cols' -> row r cols'
Nothing -> firstRow r
parser = Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream raw
parser >-> Pipes.Stream.inEOS unmarshal
-- | Transforms buffer chunks of a CSV file to parsed
-- | arrays of CSV values.
parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit
parseRaw = do
s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
Pipes.Stream.fromTransform s
-- | Transforms CSV rows into stringified CSV records
-- | using the given ordered array of column names.
stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit
stringifyRaw columns = do
s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
Pipes.Stream.fromTransform s
-- | Transforms purescript records into stringified CSV records.
-- |
-- | Columns are inferred from the record's keys, ordered alphabetically.
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit
stringify = do
raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
let
printer = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream raw
marshal = forever $ yield =<< (writeCSVRecord @r @rl <$> await)
Pipes.Stream.inEOS marshal >-> printer

View File

@ -2,11 +2,13 @@ module Test.Main where
import Prelude import Prelude
import Data.Maybe (Maybe(..))
import Effect (Effect) import Effect (Effect)
import Effect.Class.Console (log) import Effect.Aff (launchAff_)
import Test.Pipes.CSV as Test.Pipes.CSV
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit main :: Effect Unit
main = do main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ specReporter ] do
log "🍕" Test.Pipes.CSV.spec
log "You should add some tests."

88
test/Test/Pipes.CSV.purs Normal file
View File

@ -0,0 +1,88 @@
module Test.Pipes.CSV where
import Prelude
import Control.Monad.Gen (chooseInt)
import Control.Monad.Rec.Class (Step(..), tailRecM)
import Data.Array as Array
import Data.DateTime (DateTime)
import Data.Foldable (fold)
import Data.Maybe (Maybe(..), fromJust)
import Data.Newtype (wrap)
import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy)
import Effect.Class (liftEffect)
import Node.Encoding (Encoding(..))
import Partial.Unsafe (unsafePartial)
import Pipes (yield, (>->))
import Pipes (each) as Pipes
import Pipes.CSV as Pipes.CSV
import Pipes.Collect as Pipes.Collect
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.Node.Stream as Pipes.Stream
import Pipes.Prelude (map, toListM) as Pipes
import Pipes.Util as Pipes.Util
import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (shouldEqual)
csv :: String
csv = """created,flag,foo,id
2020-01-01T00:00:00.0Z,true,a,1
2024-02-02T08:00:00.0Z,false,apple,2
1970-01-01T00:00:00.0Z,true,hello,3
"""
dt :: String -> DateTime
dt = toDateTimeLossy <<< unsafePartial fromJust <<< fromRFC3339String <<< wrap
spec :: Spec Unit
spec =
describe "Pipes.CSV" do
it "stringify" do
let
objs =
[ {id: 1, foo: "a", flag: true, created: dt "2020-01-01T00:00:00Z"}
, {id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z"}
, {id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z"}
]
csv' <- map fold $ Pipes.Collect.collectArray $ Pipes.Stream.withEOS (Pipes.each objs) >-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS
csv' `shouldEqual` csv
describe "parse" do
it "parses csv" do
rows <- map Array.fromFoldable
$ Pipes.toListM
$ Pipes.Stream.withEOS (yield csv)
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
>-> Pipes.CSV.parse
>-> Pipes.Stream.unEOS
rows `shouldEqual`
[ {id: 1, foo: "a", flag: true, created: dt "2020-01-01T00:00:00Z"}
, {id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z"}
, {id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z"}
]
it "parses large csv" do
nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9)
let
csvRows = ["id\n"] <> ((_ <> "\n") <$> show <$> nums)
csv' =
let
go ix
| Just a <- Array.index csvRows ix = yield a $> Loop (ix + 1)
| otherwise = pure $ Done unit
in
tailRecM go 0
in16kbChunks =
Pipes.Util.chunked 16000
>-> Pipes.Stream.inEOS (Pipes.map fold)
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
rows <-
Pipes.Collect.collectArray
$ Pipes.Stream.withEOS csv'
>-> in16kbChunks
>-> Pipes.CSV.parse
>-> Pipes.Stream.unEOS
rows `shouldEqual` ((\id -> {id}) <$> nums)