diff --git a/.gitignore b/.gitignore index c470536..639514a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ - bower_components/ node_modules/ .pulp-cache/ @@ -10,3 +9,4 @@ generated-docs/ .purs* .psa* .spago +.tmp/ diff --git a/bun.lockb b/bun.lockb index 8cae2be..4e03452 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 532ee5a..f276774 100644 --- a/package.json +++ b/package.json @@ -1,9 +1,11 @@ { "name": "purescript-csv-stream", "version": "v1.2.19", + "type": "module", "dependencies": { "csv-parse": "^5.5.5", - "csv-stringify": "^6.4.6" + "csv-stringify": "^6.4.6", + "decimal.js": "^10.4.3" }, "devDependencies": { "typescript": "^5.4.5" diff --git a/spago.lock b/spago.lock index 4543e72..9cffae7 100644 --- a/spago.lock +++ b/spago.lock @@ -6,13 +6,10 @@ workspace: - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" - - control: ">=6.0.0 <7.0.0" - datetime: ">=6.1.0 <7.0.0" - effect: ">=4.0.0 <5.0.0" - - either: ">=6.1.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0" - - filterable: ">=5.0.0 <6.0.0" - - foldable-traversable: ">=6.0.0 <7.0.0" + - foldable-traversable - foreign: ">=7.0.0 <8.0.0" - foreign-object: ">=4.1.0 <5.0.0" - integers: ">=6.0.0 <7.0.0" @@ -21,27 +18,39 @@ workspace: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" + - node-stream-pipes - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" + - pipes: ">=8.0.0 <9.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - - st: ">=6.2.0 <7.0.0" + - st - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" + - tuples - typelevel-prelude: ">=7.0.0 <8.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0" test_dependencies: - console + - gen + - node-fs + - node-zlib + - quickcheck + - simple-json + - spec build_plan: - aff + - ansi - arraybuffer-types - arrays + - avar - bifunctors + - catenable-lists - console - const - contravariant @@ -54,12 +63,13 @@ workspace: - enums - exceptions - exists - - filterable - fixed-points - foldable-traversable - foreign - foreign-object + - fork - formatters + - free - functions - functors - gen @@ -68,12 +78,18 @@ workspace: - invariant - js-date - lazy + - lcg - lists - maybe + - mmorph - newtype - node-buffer - node-event-emitter + - node-fs + - node-path + - node-stream-pipes - node-streams + - node-zlib - nonempty - now - nullable @@ -83,13 +99,18 @@ workspace: - parallel - parsing - partial + - pipes - precise-datetime - prelude - profunctor + - quickcheck + - random - record - record-extra - refs - safe-coerce + - simple-json + - spec - st - strings - tailrec @@ -100,7 +121,11 @@ workspace: - unfoldable - unicode - unsafe-coerce - extra_packages: {} + - variant + extra_packages: + node-stream-pipes: + git: https://git.orionkindel.com/orion/purescript-node-stream-pipes + ref: v1.0.5 packages: aff: type: registry @@ -124,6 +149,14 @@ packages: - tailrec - transformers - unsafe-coerce + ansi: + type: registry + version: 7.0.0 + integrity: sha256-ZMB6HD+q9CXvn9fRCmJ8dvuDrOVHcjombL3oNOerVnE= + dependencies: + - foldable-traversable + - lists + - strings arraybuffer-types: type: registry version: 3.0.2 @@ -148,6 +181,17 @@ packages: - tuples - unfoldable - unsafe-coerce + avar: + type: registry + version: 5.0.0 + integrity: sha256-e7hf0x4hEpcygXP0LtvfvAQ49Bbj2aWtZT3gqM///0A= + dependencies: + - aff + - effect + - either + - exceptions + - functions + - maybe bifunctors: type: registry version: 6.0.0 @@ -158,6 +202,18 @@ packages: - newtype - prelude - tuples + catenable-lists: + type: registry + version: 7.0.0 + integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA= + dependencies: + - control + - foldable-traversable + - lists + - maybe + - prelude + - tuples + - unfoldable console: type: registry version: 6.1.0 @@ -272,17 +328,6 @@ packages: integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8= dependencies: - unsafe-coerce - filterable: - type: registry - version: 5.0.0 - integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng= - dependencies: - - arrays - - either - - foldable-traversable - - identity - - lists - - ordered-collections fixed-points: type: registry version: 7.0.0 @@ -339,6 +384,12 @@ packages: - tuples - typelevel-prelude - unfoldable + fork: + type: registry + version: 6.0.0 + integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E= + dependencies: + - aff formatters: type: registry version: 7.0.0 @@ -351,6 +402,25 @@ packages: - parsing - prelude - transformers + free: + type: registry + version: 7.1.0 + integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k= + dependencies: + - catenable-lists + - control + - distributive + - either + - exists + - foldable-traversable + - invariant + - lazy + - maybe + - prelude + - tailrec + - transformers + - tuples + - unsafe-coerce functions: type: registry version: 6.0.0 @@ -434,6 +504,17 @@ packages: - foldable-traversable - invariant - prelude + lcg: + type: registry + version: 4.0.0 + integrity: sha256-h7ME5cthLfbgJOJdsZcSfFpwXsx4rf8YmhebU+3iSYg= + dependencies: + - effect + - integers + - maybe + - partial + - prelude + - random lists: type: registry version: 7.0.0 @@ -460,6 +541,14 @@ packages: - invariant - newtype - prelude + mmorph: + type: registry + version: 7.0.0 + integrity: sha256-urZlZNNqGeQFe5D/ClHlR8QgGBNHTMFPtJ5S5IpflTQ= + dependencies: + - free + - functors + - transformers newtype: type: registry version: 5.0.0 @@ -490,6 +579,62 @@ packages: - nullable - prelude - unsafe-coerce + node-fs: + type: registry + version: 9.1.0 + integrity: sha256-TzhvGdrwcM0bazDvrWSqh+M/H8GKYf1Na6aGm2Qg4+c= + dependencies: + - datetime + - effect + - either + - enums + - exceptions + - functions + - integers + - js-date + - maybe + - node-buffer + - node-path + - node-streams + - nullable + - partial + - prelude + - strings + - unsafe-coerce + node-path: + type: registry + version: 5.0.0 + integrity: sha256-pd82nQ+2l5UThzaxPdKttgDt7xlsgIDLpPG0yxDEdyE= + dependencies: + - effect + node-stream-pipes: + type: git + url: https://git.orionkindel.com/orion/purescript-node-stream-pipes + rev: f2f18c3c13ae2f0f5787ccfb3832fc8c653e83ad + dependencies: + - aff + - arrays + - effect + - either + - exceptions + - foldable-traversable + - maybe + - mmorph + - newtype + - node-buffer + - node-event-emitter + - node-fs + - node-path + - node-streams + - node-zlib + - parallel + - pipes + - prelude + - st + - strings + - tailrec + - transformers + - unsafe-coerce node-streams: type: registry version: 9.0.0 @@ -503,6 +648,19 @@ packages: - node-event-emitter - nullable - prelude + node-zlib: + type: registry + version: 0.4.0 + integrity: sha256-kYSajFQFzWVg71l5/y4w4kXdTr5EJoqyV3D2RqmAjQ4= + dependencies: + - aff + - effect + - either + - functions + - node-buffer + - node-streams + - prelude + - unsafe-coerce nonempty: type: registry version: 7.0.0 @@ -610,6 +768,18 @@ packages: version: 4.0.0 integrity: sha256-fwXerld6Xw1VkReh8yeQsdtLVrjfGiVuC5bA1Wyo/J4= dependencies: [] + pipes: + type: registry + version: 8.0.0 + integrity: sha256-kvfqGM4cPA/wCcBHbp5psouFw5dZGvku2462x7ZBwSY= + dependencies: + - aff + - lists + - mmorph + - prelude + - tailrec + - transformers + - tuples precise-datetime: type: registry version: 7.0.0 @@ -650,6 +820,45 @@ packages: - newtype - prelude - tuples + quickcheck: + type: registry + version: 8.0.1 + integrity: sha256-ZvpccKQCvgslTXZCNmpYW4bUsFzhZd/kQUr2WmxFTGY= + dependencies: + - arrays + - console + - control + - effect + - either + - enums + - exceptions + - foldable-traversable + - gen + - identity + - integers + - lazy + - lcg + - lists + - maybe + - newtype + - nonempty + - numbers + - partial + - prelude + - record + - st + - strings + - tailrec + - transformers + - tuples + - unfoldable + random: + type: registry + version: 6.0.0 + integrity: sha256-CJ611a35MPCE7XQMp0rdC6MCn76znlhisiCRgboAG+Q= + dependencies: + - effect + - integers record: type: registry version: 4.0.0 @@ -683,6 +892,52 @@ packages: integrity: sha256-a1ibQkiUcbODbLE/WAq7Ttbbh9ex+x33VCQ7GngKudU= dependencies: - unsafe-coerce + simple-json: + type: registry + version: 9.0.0 + integrity: sha256-K3RJaThqsszTd+TEklzZmAdDqvIHWgXIfKqlsoykU1c= + dependencies: + - arrays + - exceptions + - foreign + - foreign-object + - nullable + - prelude + - record + - typelevel-prelude + - variant + spec: + type: registry + version: 7.6.0 + integrity: sha256-+merGdQbL9zWONbnt8S8J9afGJ59MQqGtS0qSd3yu4I= + dependencies: + - aff + - ansi + - arrays + - avar + - bifunctors + - control + - datetime + - effect + - either + - exceptions + - foldable-traversable + - fork + - identity + - integers + - lists + - maybe + - newtype + - now + - ordered-collections + - parallel + - pipes + - prelude + - refs + - strings + - tailrec + - transformers + - tuples st: type: registry version: 6.2.0 @@ -788,3 +1043,16 @@ packages: version: 6.0.0 integrity: sha256-IqIYW4Vkevn8sI+6aUwRGvd87tVL36BBeOr0cGAE7t0= dependencies: [] + variant: + type: registry + version: 8.0.0 + integrity: sha256-SR//zQDg2dnbB8ZHslcxieUkCeNlbMToapvmh9onTtw= + dependencies: + - enums + - lists + - maybe + - partial + - prelude + - record + - tuples + - unsafe-coerce diff --git a/spago.yaml b/spago.yaml index e6afb39..bf3d682 100644 --- a/spago.yaml +++ b/spago.yaml @@ -10,16 +10,16 @@ package: strict: true pedanticPackages: true dependencies: + - foldable-traversable + - node-stream-pipes + - st + - tuples - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" - - control: ">=6.0.0 <7.0.0" - datetime: ">=6.1.0 <7.0.0" - effect: ">=4.0.0 <5.0.0" - - either: ">=6.1.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0" - - filterable: ">=5.0.0 <6.0.0" - - foldable-traversable: ">=6.0.0 <7.0.0" - foreign: ">=7.0.0 <8.0.0" - foreign-object: ">=4.1.0 <5.0.0" - integers: ">=6.0.0 <7.0.0" @@ -32,11 +32,11 @@ package: - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" + - pipes: ">=8.0.0 <9.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" @@ -46,5 +46,14 @@ package: main: Test.Main dependencies: - console + - gen + - node-fs + - node-zlib + - quickcheck + - simple-json + - spec workspace: - extraPackages: {} + extraPackages: + node-stream-pipes: + git: 'https://git.orionkindel.com/orion/purescript-node-stream-pipes' + ref: 'v1.0.5' diff --git a/src/Node.Stream.CSV.Parse.js b/src/Node.Stream.CSV.Parse.js index 6c6cdf5..a72708c 100644 --- a/src/Node.Stream.CSV.Parse.js +++ b/src/Node.Stream.CSV.Parse.js @@ -1,30 +1,7 @@ -import { parse, Parser } from "csv-parse"; +import { Parser } from "csv-parse"; -class ParserWithColumns extends Parser { - /** @type {Array} */ - columns = []; - /** @type {Map | null} */ - columnsMap = null; -} +/** @type {(s: import('csv-parse').Options) => () => Parser} */ +export const makeImpl = (c) => () => new Parser(c); -/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */ -export const makeImpl = (c) => () => { - const parser = new ParserWithColumns(c); - parser.once("readable", () => { - parser.columns = parser.read(); - parser.emit("columns", parser.columns); - }); - return parser; -}; - -/** @type {(s: ParserWithColumns) => () => Array | null} */ +/** @type {(s: Parser) => () => Array | null} */ export const readImpl = (p) => () => p.read(); - -/** @type {(s: ParserWithColumns) => () => Array} */ -export const columnsArrayImpl = (p) => () => p.columns; - -/** @type {(s: ParserWithColumns) => () => Map | null} */ -export const columnsMapImpl = (p) => () => p.columnsMap; - -/** @type {(s: ParserWithColumns) => (m: Map) => () => void} */ -export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m); diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index a5dbb0e..943f225 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -2,45 +2,18 @@ module Node.Stream.CSV.Parse where import Prelude hiding (join) -import Control.Alt ((<|>)) -import Control.Monad.Error.Class (liftEither, liftMaybe) -import Control.Monad.Except (runExcept) -import Control.Monad.Except.Trans (catchError) -import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) -import Control.Monad.Rec.Class (untilJust) -import Control.Monad.ST.Class (liftST) -import Control.Monad.ST.Global as ST -import Control.Monad.ST.Ref as STRef -import Control.Monad.Trans.Class (lift) -import Data.Array as Array -import Data.Array.ST as Array.ST -import Data.Bifunctor (lmap) -import Data.CSV.Record (class ReadCSVRecord, readCSVRecord) -import Data.Either (Either(..)) -import Data.Filterable (filter) -import Data.Map (Map) -import Data.Map as Map -import Data.Maybe (Maybe(..), isNothing) -import Data.Newtype (wrap) import Data.Nullable (Nullable) -import Data.Nullable as Nullable import Effect (Effect) -import Effect.Aff (Canceler(..), delay, launchAff_, makeAff) -import Effect.Aff.Class (class MonadAff, liftAff) -import Effect.Class (liftEffect) -import Effect.Exception (error) import Effect.Uncurried (mkEffectFn1) import Foreign (Foreign, unsafeToForeign) import Foreign.Object (Object) -import Foreign.Object as Object -import Node.Encoding (Encoding(..)) +import Foreign.Object (union) as Object +import Node.Buffer (Buffer) import Node.EventEmitter (EventHandle(..)) -import Node.EventEmitter as Event import Node.EventEmitter.UtilTypes (EventHandle1) import Node.Stream (Read, Stream, Write) -import Node.Stream as Stream +import Node.Stream.Object (Transform) as Object import Prim.Row (class Union) -import Prim.RowList (class RowToList) import Unsafe.Coerce (unsafeCoerce) data CSVRead @@ -49,12 +22,9 @@ data CSVRead -- | into parsed purescript objects. -- | -- | The CSV contents may be piped into this stream --- | as Buffer or String encoded chunks. --- | --- | Records can be read with `read` when `Node.Stream.readable` --- | is true. -type CSVParser :: Row Type -> Row Type -> Type -type CSVParser a r = Stream (read :: Read, write :: Write, csv :: CSVRead | r) +-- | as Buffer or String chunks. +type CSVParser :: Row Type -> Type +type CSVParser r = Stream (read :: Read, write :: Write, csv :: CSVRead | r) -- | https://csv.js.org/parse/options/ type Config r = @@ -86,136 +56,22 @@ type Config r = ) -- | Create a CSVParser -make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ()) +make :: forall @config @missing @extra. Union config missing (Config extra) => { | config } -> Effect (CSVParser ()) make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign --- | Synchronously parse a CSV string -parse - :: forall @r rl @config missing extra m - . MonadAff m - => RowToList r rl - => ReadCSVRecord r rl - => Union config missing (Config extra) - => { | config } - -> String - -> m (Array { | r }) -parse config csv = do - stream <- liftEffect $ make @r @config @missing @extra config - void $ liftEffect $ Stream.writeString stream UTF8 csv - liftEffect $ Stream.end stream - readAll stream - --- | Loop until the stream is closed, invoking the callback with each record as it is parsed. -foreach - :: forall @r rl x m - . MonadAff m - => RowToList r rl - => ReadCSVRecord r rl - => CSVParser r x - -> ({ | r } -> Effect Unit) - -> m Unit -foreach stream cb = do - alreadyHaveCols <- liftEffect $ getOrInitColumnsMap stream - when (isNothing alreadyHaveCols) - $ liftAff - $ makeAff \res -> do - stop <- flip (Event.once columnsH) stream $ const do - void $ getOrInitColumnsMap stream - res $ Right unit - pure $ Canceler $ const $ liftEffect stop - - liftAff $ makeAff \res -> do - count <- ST.toEffect $ STRef.new 0 - removeDataListener <- flip (Event.on dataH) stream \row -> - void - $ flip catchError (res <<< Left) - $ do - void $ ST.toEffect $ STRef.modify (_ + 1) count - cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream - record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row - flip catchError (liftEffect <<< res <<< Left) (cb record) - void $ ST.toEffect $ STRef.modify (_ - 1) count - - removeErrorListener <- flip (Event.once Stream.errorH) stream (res <<< Left) - removeEndListener <- flip (Event.once Stream.endH) stream $ launchAff_ do - untilJust do - delay $ wrap 10.0 - ct <- liftEffect $ ST.toEffect $ STRef.read count - pure $ if ct <= 1 then Just unit else Nothing - liftEffect $ res $ Right unit - - pure $ Canceler $ const $ liftEffect do - removeDataListener - removeEndListener - removeErrorListener - --- | Reads a parsed record from the stream. --- | --- | Returns `Nothing` when either: --- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`) --- | - All records have been processed (`Node.Stream.closed` is `true`) -read - :: forall @r rl a - . RowToList r rl - => ReadCSVRecord r rl - => CSVParser r a - -> Effect (Maybe { | r }) -read stream = runMaybeT do - cols <- MaybeT $ getOrInitColumnsMap stream - raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream - liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw - --- | Collect all parsed records into an array -readAll - :: forall @r rl a m - . MonadAff m - => RowToList r rl - => ReadCSVRecord r rl - => CSVParser r a - -> m (Array { | r }) -readAll stream = do - records <- liftEffect $ liftST $ Array.ST.new - foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records - liftEffect $ liftST $ Array.ST.unsafeFreeze records +toObjectStream :: CSVParser () -> Object.Transform Buffer (Array String) +toObjectStream = unsafeCoerce -- | `data` event. Emitted when a CSV record has been parsed. -dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String) +dataH :: forall a. EventHandle1 (CSVParser a) (Array String) dataH = EventHandle "data" mkEffectFn1 --- | `columns` event. Emitted when the header row has been parsed. -columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String) -columnsH = EventHandle "columns" mkEffectFn1 - -- | FFI foreign import makeImpl :: forall r. Foreign -> Effect (Stream r) -- | FFI foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String)) --- | FFI -foreign import columnsArrayImpl :: forall r. Stream r -> Effect (Array String) - --- | FFI -foreign import columnsMapImpl :: forall r. Stream r -> Effect (Nullable (Map String Int)) - --- | FFI -foreign import setColumnsMapImpl :: forall r. Stream r -> Map String Int -> Effect Unit - --- | FFI -getOrInitColumnsMap :: forall r x. CSVParser r x -> Effect (Maybe (Map String Int)) -getOrInitColumnsMap s = runMaybeT do - cols :: Array String <- MaybeT $ filter (not <<< Array.null) <$> Just <$> columnsArrayImpl s - let - get = MaybeT $ Nullable.toMaybe <$> columnsMapImpl s - init = do - let - ixs = Array.range 0 (Array.length cols - 1) - assoc = Array.zip cols ixs - map = Map.fromFoldable assoc - lift $ setColumnsMapImpl s map - pure map - get <|> init - -- | FFI recordToForeign :: forall r. Record r -> Object Foreign recordToForeign = unsafeCoerce diff --git a/src/Node.Stream.CSV.Stringify.purs b/src/Node.Stream.CSV.Stringify.purs index bb527c2..2a3a605 100644 --- a/src/Node.Stream.CSV.Stringify.purs +++ b/src/Node.Stream.CSV.Stringify.purs @@ -2,30 +2,16 @@ module Node.Stream.CSV.Stringify where import Prelude -import Control.Monad.Rec.Class (class MonadRec, whileJust) -import Control.Monad.ST.Global as ST -import Data.Array as Array -import Data.Array.ST as Array.ST import Data.CSV.Record (class WriteCSVRecord, writeCSVRecord) -import Data.Either (Either(..), blush) -import Data.Foldable (class Foldable, fold) -import Data.Maybe (Maybe(..)) import Data.String.Regex (Regex) -import Data.Traversable (for_) import Effect (Effect) -import Effect.Aff (Canceler(..), makeAff) -import Effect.Aff.Class (class MonadAff, liftAff) -import Effect.Class (liftEffect) import Foreign (Foreign, unsafeToForeign) import Foreign.Object (Object) -import Foreign.Object as Object -import Node.EventEmitter as Event +import Foreign.Object (union) as Object import Node.Stream (Read, Stream, Write) -import Node.Stream as Stream +import Node.Stream.Object (Transform) as Object import Prim.Row (class Union) import Prim.RowList (class RowToList) -import Record.Extra (class Keys, keys) -import Type.Prelude (Proxy(..)) import Unsafe.Coerce (unsafeCoerce) data CSVWrite @@ -38,8 +24,8 @@ data CSVWrite -- | Stringified rows are emitted on the `Readable` end as string -- | chunks, meaning it can be treated as a `Node.Stream.Readable` -- | that has had `setEncoding UTF8` invoked on it. -type CSVStringifier :: Row Type -> Row Type -> Type -type CSVStringifier a r = Stream (read :: Read, write :: Write, csv :: CSVWrite | r) +type CSVStringifier :: Row Type -> Type +type CSVStringifier r = Stream (read :: Read, write :: Write, csv :: CSVWrite | r) -- | https://csv.js.org/stringify/options/ type Config r = @@ -48,7 +34,6 @@ type Config r = , record_delimiter :: String , escape :: String , escape_formulas :: Boolean - , header :: Boolean , quote :: String , quoted :: Boolean , quoted_empty :: Boolean @@ -63,43 +48,36 @@ foreign import writeImpl :: forall r. Stream r -> Array String -> Effect Unit recordToForeign :: forall r. Record r -> Object Foreign recordToForeign = unsafeCoerce --- | Create a CSVStringifier -make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ()) -make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign +-- | Create a raw Transform stream that accepts chunks of `Array String`, +-- | and transforms them into string CSV rows. +-- | +-- | Requires an ordered array of column names. +make + :: forall @config @missing @extra + . Union config missing (Config extra) + => Array String + -> { | config } + -> Effect (CSVStringifier ()) +make columns = + makeImpl + <<< unsafeToForeign + <<< Object.union (recordToForeign { columns, header: true }) + <<< recordToForeign --- | Synchronously stringify a collection of records -stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String -stringify config records = do - stream <- liftEffect $ make @r @config @missing @extra config - liftEffect $ for_ records \r -> write stream r - liftEffect $ Stream.end stream - readAll stream +-- | Convert the raw stream to a typed ObjectStream +toObjectStream :: CSVStringifier () -> Object.Transform (Array String) String +toObjectStream = unsafeCoerce -- | Write a record to a CSVStringifier. -- | -- | The record will be emitted on the `Readable` end -- | of the stream as a string chunk. -write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier r a -> { | r } -> Effect Unit +write :: forall @r rl a. RowToList r rl => WriteCSVRecord r rl => CSVStringifier a -> { | r } -> Effect Unit write s = writeImpl s <<< writeCSVRecord @r @rl --- | Loop until the stream is closed, invoking the callback with each chunk of stringified CSV text. -foreach :: forall m r x. MonadAff m => MonadRec m => CSVStringifier r x -> (String -> m Unit) -> m Unit -foreach stream cb = whileJust do - isReadable <- liftEffect $ Stream.readable stream - liftAff $ when (not isReadable) $ makeAff \res -> do - stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit - pure $ Canceler $ const $ liftEffect stop - whileJust do - s <- liftEffect $ (join <<< map blush) <$> Stream.readEither stream - for_ s cb - pure $ void s - isClosed <- liftEffect $ Stream.closed stream - pure $ if isClosed then Nothing else Just unit - --- | Read the stringified chunks until end-of-stream, returning the entire CSV string. -readAll :: forall r a m. MonadAff m => MonadRec m => CSVStringifier r a -> m String -readAll stream = do - chunks <- liftEffect $ ST.toEffect $ Array.ST.new - foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push chunks - chunks' <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze chunks - pure $ fold chunks' +-- | Write a record to a CSVStringifier. +-- | +-- | The record will be emitted on the `Readable` end +-- | of the stream as a string chunk. +writeRaw :: forall a. CSVStringifier a -> Array String -> Effect Unit +writeRaw = writeImpl diff --git a/src/Pipes.CSV.purs b/src/Pipes.CSV.purs new file mode 100644 index 0000000..9ac747c --- /dev/null +++ b/src/Pipes.CSV.purs @@ -0,0 +1,98 @@ +module Pipes.CSV where + +import Prelude + +import Control.Monad.Error.Class (liftEither) +import Control.Monad.Except (runExcept) +import Control.Monad.Rec.Class (forever) +import Control.Monad.ST.Global as ST +import Control.Monad.ST.Ref as STRef +import Data.Array as Array +import Data.Bifunctor (lmap) +import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord) +import Data.FunctorWithIndex (mapWithIndex) +import Data.Map as Map +import Data.Maybe (Maybe(..)) +import Data.Tuple.Nested ((/\)) +import Effect.Aff (Aff) +import Effect.Class (liftEffect) +import Effect.Exception (error) +import Node.Buffer (Buffer) +import Node.Stream.CSV.Parse as CSV.Parse +import Node.Stream.CSV.Stringify as CSV.Stringify +import Pipes (await, yield, (>->)) +import Pipes.Core (Pipe) +import Pipes.Node.Stream as Pipes.Stream +import Prim.RowList (class RowToList) +import Record.Extra (class Keys, keys) +import Type.Prelude (Proxy(..)) + +-- | Transforms buffer chunks of a CSV file to parsed +-- | records of `r`. +-- | +-- | ``` +-- | -- == my-data.csv.gz == +-- | -- id,foo,is_deleted +-- | -- 1,hi,f +-- | -- 2,bye,t +-- | +-- | rows +-- | :: Array {id :: Int, foo :: String, is_deleted :: Boolean} +-- | <- map Array.fromFoldable +-- | $ Pipes.toListM +-- | $ Pipes.Node.Stream.unEOS +-- | $ Pipes.Node.FS.read "my-data.csv.gz" +-- | >-> Pipes.Node.Zlib.gunzip +-- | >-> Pipes.CSV.parse +-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}] +-- | ``` +parse + :: forall @r rl + . RowToList r rl + => ReadCSVRecord r rl + => Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit +parse = do + raw <- liftEffect $ CSV.Parse.make {} + colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing + + let + readCols = liftEffect $ ST.toEffect $ STRef.read colsST + putCols a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) colsST + + parse' a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a + firstRow a = putCols $ Map.fromFoldable $ mapWithIndex (flip (/\)) a + row a cols' = yield =<< parse' a cols' + unmarshal = forever do + r <- await + cols <- readCols + case cols of + Just cols' -> row r cols' + Nothing -> firstRow r + + parser = Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream raw + parser >-> Pipes.Stream.inEOS unmarshal + +-- | Transforms buffer chunks of a CSV file to parsed +-- | arrays of CSV values. +parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit +parseRaw = do + s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} + Pipes.Stream.fromTransform s + +-- | Transforms CSV rows into stringified CSV records +-- | using the given ordered array of column names. +stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit +stringifyRaw columns = do + s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {} + Pipes.Stream.fromTransform s + +-- | Transforms purescript records into stringified CSV records. +-- | +-- | Columns are inferred from the record's keys, ordered alphabetically. +stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit +stringify = do + raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {} + let + printer = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream raw + marshal = forever $ yield =<< (writeCSVRecord @r @rl <$> await) + Pipes.Stream.inEOS marshal >-> printer diff --git a/test/Test/Main.purs b/test/Test/Main.purs index e616930..9c56e51 100644 --- a/test/Test/Main.purs +++ b/test/Test/Main.purs @@ -2,11 +2,13 @@ module Test.Main where import Prelude +import Data.Maybe (Maybe(..)) import Effect (Effect) -import Effect.Class.Console (log) +import Effect.Aff (launchAff_) +import Test.Pipes.CSV as Test.Pipes.CSV +import Test.Spec.Reporter (specReporter) +import Test.Spec.Runner (defaultConfig, runSpec') main :: Effect Unit -main = do - log "🍕" - log "You should add some tests." - +main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ specReporter ] do + Test.Pipes.CSV.spec diff --git a/test/Test/Pipes.CSV.purs b/test/Test/Pipes.CSV.purs new file mode 100644 index 0000000..13a71c1 --- /dev/null +++ b/test/Test/Pipes.CSV.purs @@ -0,0 +1,88 @@ +module Test.Pipes.CSV where + +import Prelude + +import Control.Monad.Gen (chooseInt) +import Control.Monad.Rec.Class (Step(..), tailRecM) +import Data.Array as Array +import Data.DateTime (DateTime) +import Data.Foldable (fold) +import Data.Maybe (Maybe(..), fromJust) +import Data.Newtype (wrap) +import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy) +import Effect.Class (liftEffect) +import Node.Encoding (Encoding(..)) +import Partial.Unsafe (unsafePartial) +import Pipes (yield, (>->)) +import Pipes (each) as Pipes +import Pipes.CSV as Pipes.CSV +import Pipes.Collect as Pipes.Collect +import Pipes.Node.Buffer as Pipes.Buffer +import Pipes.Node.Stream as Pipes.Stream +import Pipes.Prelude (map, toListM) as Pipes +import Pipes.Util as Pipes.Util +import Test.QuickCheck.Gen (randomSample') +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (shouldEqual) + +csv :: String +csv = """created,flag,foo,id +2020-01-01T00:00:00.0Z,true,a,1 +2024-02-02T08:00:00.0Z,false,apple,2 +1970-01-01T00:00:00.0Z,true,hello,3 +""" + +dt :: String -> DateTime +dt = toDateTimeLossy <<< unsafePartial fromJust <<< fromRFC3339String <<< wrap + +spec :: Spec Unit +spec = + describe "Pipes.CSV" do + it "stringify" do + let + objs = + [ {id: 1, foo: "a", flag: true, created: dt "2020-01-01T00:00:00Z"} + , {id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z"} + , {id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z"} + ] + + csv' <- map fold $ Pipes.Collect.collectArray $ Pipes.Stream.withEOS (Pipes.each objs) >-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS + csv' `shouldEqual` csv + describe "parse" do + it "parses csv" do + rows <- map Array.fromFoldable + $ Pipes.toListM + $ Pipes.Stream.withEOS (yield csv) + >-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8) + >-> Pipes.CSV.parse + >-> Pipes.Stream.unEOS + + rows `shouldEqual` + [ {id: 1, foo: "a", flag: true, created: dt "2020-01-01T00:00:00Z"} + , {id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z"} + , {id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z"} + ] + it "parses large csv" do + nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9) + let + csvRows = ["id\n"] <> ((_ <> "\n") <$> show <$> nums) + csv' = + let + go ix + | Just a <- Array.index csvRows ix = yield a $> Loop (ix + 1) + | otherwise = pure $ Done unit + in + tailRecM go 0 + in16kbChunks = + Pipes.Util.chunked 16000 + >-> Pipes.Stream.inEOS (Pipes.map fold) + >-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8) + + rows <- + Pipes.Collect.collectArray + $ Pipes.Stream.withEOS csv' + >-> in16kbChunks + >-> Pipes.CSV.parse + >-> Pipes.Stream.unEOS + + rows `shouldEqual` ((\id -> {id}) <$> nums)