From e21260cd2c80b244e3f7e4f4a23d4b3f7fe5e90f Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 3 May 2024 10:41:33 -0500 Subject: [PATCH] feat: Parser.foreach should concurrently process records as they are read User-provided callback no longer blocks subsequent reads --- bun/fmt.js | 32 +++++++++++----------- bun/prepare.js | 44 +++++++++++++++--------------- spago.lock | 9 ++++++ spago.yaml | 2 ++ src/Data.CSV.Record.purs | 11 ++++---- src/Node.Stream.CSV.Parse.js | 26 +++++++++--------- src/Node.Stream.CSV.Parse.purs | 26 +++++++++++------- src/Node.Stream.CSV.Stringify.js | 6 ++-- src/Node.Stream.CSV.Stringify.purs | 2 +- 9 files changed, 88 insertions(+), 70 deletions(-) diff --git a/bun/fmt.js b/bun/fmt.js index 04b5c40..29d0fe0 100644 --- a/bun/fmt.js +++ b/bun/fmt.js @@ -1,27 +1,27 @@ /** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */ const prettier = (parser, ps) => - Bun.spawn(['bun', 'x', 'prettier', '--write', '--parser', parser, ...ps], { - stdout: 'inherit', - stderr: 'inherit', - }) + Bun.spawn(["bun", "x", "prettier", "--write", "--parser", parser, ...ps], { + stdout: "inherit", + stderr: "inherit", + }); const procs = [ - prettier('babel', ['./src/**/*.js', './bun/**/*.js', './.prettierrc.cjs']), - prettier('json', ['./package.json', './jsconfig.json']), + prettier("babel", ["./src/**/*.js", "./bun/**/*.js", "./.prettierrc.cjs"]), + prettier("json", ["./package.json", "./jsconfig.json"]), Bun.spawn( [ - 'bun', - 'x', - 'purs-tidy', - 'format-in-place', - 'src/**/*.purs', - 'test/**/*.purs', + "bun", + "x", + "purs-tidy", + "format-in-place", + "src/**/*.purs", + "test/**/*.purs", ], { - stdout: 'inherit', - stderr: 'inherit', + stdout: "inherit", + stderr: "inherit", }, ), -] +]; -await Promise.all(procs.map(p => p.exited)) +await Promise.all(procs.map((p) => p.exited)); diff --git a/bun/prepare.js b/bun/prepare.js index 8abdfb1..c64ce90 100644 --- a/bun/prepare.js +++ b/bun/prepare.js @@ -1,34 +1,34 @@ -import { readFile, writeFile } from 'fs/promises' -import { execSync } from 'child_process' +import { readFile, writeFile } from "fs/promises"; +import { execSync } from "child_process"; -let ver = process.argv[2] +let ver = process.argv[2]; if (!ver) { - console.error(`tag required: bun bun/prepare.js v1.0.0`) - process.exit(1) + console.error(`tag required: bun bun/prepare.js v1.0.0`); + process.exit(1); } else if (!/v\d+\.\d+\.\d+/.test(ver)) { - console.error(`invalid tag: ${ver}`) - process.exit(1) + console.error(`invalid tag: ${ver}`); + process.exit(1); } -ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || '' +ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || ""; -const pkg = await readFile('./package.json', 'utf8') -const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`) -await writeFile('./package.json', pkgnew) +const pkg = await readFile("./package.json", "utf8"); +const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`); +await writeFile("./package.json", pkgnew); -const spago = await readFile('./spago.yaml', 'utf8') -const spagonew = spago.replace(/version: .+/, `version: '${ver}'`) -await writeFile('./spago.yaml', spagonew) +const spago = await readFile("./spago.yaml", "utf8"); +const spagonew = spago.replace(/version: .+/, `version: '${ver}'`); +await writeFile("./spago.yaml", spagonew); -const readme = await readFile('./README.md', 'utf8') +const readme = await readFile("./README.md", "utf8"); const readmenew = readme.replace( /packages\/purescript-csv-stream\/.+?\//g, `/packages/purescript-csv-stream/${ver}/`, -) -await writeFile('./README.md', readmenew) +); +await writeFile("./README.md", readmenew); -execSync(`git add spago.yaml package.json README.md`) -execSync(`git commit -m 'chore: prepare v${ver}'`) -execSync(`git tag v${ver}`) -execSync(`git push --tags`) -execSync(`git push --mirror github-mirror`) +execSync(`git add spago.yaml package.json README.md`); +execSync(`git commit -m 'chore: prepare v${ver}'`); +execSync(`git tag v${ver}`); +execSync(`git push --tags`); +execSync(`git push --mirror github-mirror`); diff --git a/spago.lock b/spago.lock index 4543e72..61b5b79 100644 --- a/spago.lock +++ b/spago.lock @@ -15,6 +15,7 @@ workspace: - foldable-traversable: ">=6.0.0 <7.0.0" - foreign: ">=7.0.0 <8.0.0" - foreign-object: ">=4.1.0 <5.0.0" + - fork - integers: ">=6.0.0 <7.0.0" - lists: ">=7.0.0 <8.0.0" - maybe: ">=6.0.0 <7.0.0" @@ -25,6 +26,7 @@ workspace: - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" + - parallel - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" @@ -59,6 +61,7 @@ workspace: - foldable-traversable - foreign - foreign-object + - fork - formatters - functions - functors @@ -339,6 +342,12 @@ packages: - tuples - typelevel-prelude - unfoldable + fork: + type: registry + version: 6.0.0 + integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E= + dependencies: + - aff formatters: type: registry version: 7.0.0 diff --git a/spago.yaml b/spago.yaml index f8bb931..40568d7 100644 --- a/spago.yaml +++ b/spago.yaml @@ -22,6 +22,7 @@ package: - foldable-traversable: ">=6.0.0 <7.0.0" - foreign: ">=7.0.0 <8.0.0" - foreign-object: ">=4.1.0 <5.0.0" + - fork: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0" - lists: ">=7.0.0 <8.0.0" - maybe: ">=6.0.0 <7.0.0" @@ -32,6 +33,7 @@ package: - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" + - parallel: ">=6.0.0 <7.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" diff --git a/src/Data.CSV.Record.purs b/src/Data.CSV.Record.purs index 4cdf3b5..e57c5ef 100644 --- a/src/Data.CSV.Record.purs +++ b/src/Data.CSV.Record.purs @@ -22,11 +22,12 @@ class RowToList r rl <= WriteCSVRecord r rl | rl -> r where writeCSVRecord :: { | r } -> Array String instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where - writeCSVRecord r = let - val = writeCSV $ Record.get (Proxy @k) r - tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r - in - [val] <> tail + writeCSVRecord r = + let + val = writeCSV $ Record.get (Proxy @k) r + tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r + in + [ val ] <> tail instance WriteCSVRecord () Nil where writeCSVRecord _ = [] diff --git a/src/Node.Stream.CSV.Parse.js b/src/Node.Stream.CSV.Parse.js index 2c11e46..8109df9 100644 --- a/src/Node.Stream.CSV.Parse.js +++ b/src/Node.Stream.CSV.Parse.js @@ -1,29 +1,29 @@ -import {parse, Parser} from 'csv-parse' +import { parse, Parser } from "csv-parse"; class ParserWithColumns extends Parser { /** @type {Array} */ - columns = [] + columns = []; /** @type {Map | null} */ - columnsMap = null + columnsMap = null; } /** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */ -export const makeImpl = c => () => { - const parser = new ParserWithColumns(c) - parser.once('readable', () => { +export const makeImpl = (c) => () => { + const parser = new ParserWithColumns(c); + parser.once("readable", () => { parser.columns = parser.read(); - }) - return parser -} + }); + return parser; +}; /** @type {(s: ParserWithColumns) => () => Array | null} */ -export const readImpl = p => () => p.read(); +export const readImpl = (p) => () => p.read(); /** @type {(s: ParserWithColumns) => () => Array} */ -export const columnsArrayImpl = p => () => p.columns +export const columnsArrayImpl = (p) => () => p.columns; /** @type {(s: ParserWithColumns) => () => Map | null} */ -export const columnsMapImpl = p => () => p.columnsMap +export const columnsMapImpl = (p) => () => p.columnsMap; /** @type {(s: ParserWithColumns) => (m: Map) => () => void} */ -export const setColumnsMapImpl = p => m => () => p.columnsMap = m +export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m); diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index 6f6d80d..3817912 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -1,14 +1,17 @@ module Node.Stream.CSV.Parse where -import Prelude +import Prelude hiding (join) import Control.Alt ((<|>)) +import Control.Alternative (class Alternative) import Control.Monad.Error.Class (liftEither) import Control.Monad.Except (runExcept) +import Control.Monad.Fork.Class (class MonadFork, fork, join) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) -import Control.Monad.Rec.Class (class MonadRec, whileJust) +import Control.Monad.Rec.Class (class MonadRec, tailRecM, Step(..), whileJust) import Control.Monad.ST.Global as ST import Control.Monad.Trans.Class (lift) +import Control.Parallel (class Parallel, parTraverse) import Data.Array as Array import Data.Array.ST as Array.ST import Data.Bifunctor (lmap) @@ -21,7 +24,6 @@ import Data.Maybe (Maybe(..)) import Data.Newtype (wrap) import Data.Nullable (Nullable) import Data.Nullable as Nullable -import Data.Traversable (for_) import Effect (Effect) import Effect.Aff (Canceler(..), delay, makeAff) import Effect.Aff.Class (class MonadAff, liftAff) @@ -85,10 +87,10 @@ type Config r = -- | Create a CSVParser make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVParser r ()) -make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: false, cast: false, cast_date: false}) <<< recordToForeign +make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign -- | Synchronously parse a CSV string -parse :: forall @r rl @config missing extra m. MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r }) +parse :: forall @r rl @config missing extra m p f. Parallel p m => MonadFork f m => Alternative p => MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r }) parse config csv = do stream <- liftEffect $ make @r @config @missing @extra config void $ liftEffect $ Stream.writeString stream UTF8 csv @@ -96,17 +98,21 @@ parse config csv = do readAll stream -- | Loop until the stream is closed, invoking the callback with each record as it is parsed. -foreach :: forall @r rl x m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit +foreach :: forall @r rl x m f p. Parallel p m => Alternative p => MonadFork f m => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit foreach stream cb = whileJust do liftAff $ delay $ wrap 0.0 isReadable <- liftEffect $ Stream.readable stream liftAff $ when (not isReadable) $ makeAff \res -> do stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit pure $ Canceler $ const $ liftEffect stop - whileJust do + fibers <- flip tailRecM [] \fibers -> do r <- liftEffect $ read @r stream - for_ r cb - pure $ void r + case r of + Just r' -> do + f <- fork (cb r') + pure $ Loop $ fibers <> [ f ] + Nothing -> pure $ Done fibers + void $ parTraverse join fibers isClosed <- liftEffect $ Stream.closed stream pure $ if isClosed then Nothing else Just unit @@ -122,7 +128,7 @@ read stream = runMaybeT do liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw -- | Collect all parsed records into an array -readAll :: forall @r rl a m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r }) +readAll :: forall @r rl a m p f. Parallel p m => MonadFork f m => Alternative p => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r }) readAll stream = do records <- liftEffect $ ST.toEffect $ Array.ST.new foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records diff --git a/src/Node.Stream.CSV.Stringify.js b/src/Node.Stream.CSV.Stringify.js index bf3a296..99e4ed7 100644 --- a/src/Node.Stream.CSV.Stringify.js +++ b/src/Node.Stream.CSV.Stringify.js @@ -1,7 +1,7 @@ -import {stringify} from 'csv-stringify' +import { stringify } from "csv-stringify"; /** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */ -export const makeImpl = c => () => stringify(c) +export const makeImpl = (c) => () => stringify(c); /** @type {(s: import('csv-stringify').Stringifier) => (vals: Array) => () => void} */ -export const writeImpl = s => vals => () => s.write(vals) +export const writeImpl = (s) => (vals) => () => s.write(vals); diff --git a/src/Node.Stream.CSV.Stringify.purs b/src/Node.Stream.CSV.Stringify.purs index 06dc4eb..bb527c2 100644 --- a/src/Node.Stream.CSV.Stringify.purs +++ b/src/Node.Stream.CSV.Stringify.purs @@ -65,7 +65,7 @@ recordToForeign = unsafeCoerce -- | Create a CSVStringifier make :: forall @r rl @config @missing @extra. Keys rl => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> Effect (CSVStringifier r ()) -make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign {columns: Array.fromFoldable $ keys (Proxy @r)}) <<< recordToForeign +make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: Array.fromFoldable $ keys (Proxy @r) }) <<< recordToForeign -- | Synchronously stringify a collection of records stringify :: forall @r rl f m @config missing extra. MonadAff m => MonadRec m => Keys rl => Foldable f => RowToList r rl => WriteCSVRecord r rl => Union config missing (Config extra) => { | config } -> f { | r } -> m String