feat: Parser.foreach should concurrently process records as they are read

User-provided callback no longer blocks subsequent reads
This commit is contained in:
orion 2024-05-03 10:41:33 -05:00
parent 02090c3129
commit e21260cd2c
Signed by: orion
GPG Key ID: 6D4165AE4C928719
9 changed files with 88 additions and 70 deletions

View File

@ -1,27 +1,27 @@
/** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */ /** @type {(parser: string, ps: string[]) => import("bun").Subprocess} */
const prettier = (parser, ps) => const prettier = (parser, ps) =>
Bun.spawn(['bun', 'x', 'prettier', '--write', '--parser', parser, ...ps], { Bun.spawn(["bun", "x", "prettier", "--write", "--parser", parser, ...ps], {
stdout: 'inherit', stdout: "inherit",
stderr: 'inherit', stderr: "inherit",
}) });
const procs = [ const procs = [
prettier('babel', ['./src/**/*.js', './bun/**/*.js', './.prettierrc.cjs']), prettier("babel", ["./src/**/*.js", "./bun/**/*.js", "./.prettierrc.cjs"]),
prettier('json', ['./package.json', './jsconfig.json']), prettier("json", ["./package.json", "./jsconfig.json"]),
Bun.spawn( Bun.spawn(
[ [
'bun', "bun",
'x', "x",
'purs-tidy', "purs-tidy",
'format-in-place', "format-in-place",
'src/**/*.purs', "src/**/*.purs",
'test/**/*.purs', "test/**/*.purs",
], ],
{ {
stdout: 'inherit', stdout: "inherit",
stderr: 'inherit', stderr: "inherit",
}, },
), ),
] ];
await Promise.all(procs.map(p => p.exited)) await Promise.all(procs.map((p) => p.exited));

View File

@ -1,34 +1,34 @@
import { readFile, writeFile } from 'fs/promises' import { readFile, writeFile } from "fs/promises";
import { execSync } from 'child_process' import { execSync } from "child_process";
let ver = process.argv[2] let ver = process.argv[2];
if (!ver) { if (!ver) {
console.error(`tag required: bun bun/prepare.js v1.0.0`) console.error(`tag required: bun bun/prepare.js v1.0.0`);
process.exit(1) process.exit(1);
} else if (!/v\d+\.\d+\.\d+/.test(ver)) { } else if (!/v\d+\.\d+\.\d+/.test(ver)) {
console.error(`invalid tag: ${ver}`) console.error(`invalid tag: ${ver}`);
process.exit(1) process.exit(1);
} }
ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || '' ver = (/\d+\.\d+\.\d+/.exec(ver) || [])[0] || "";
const pkg = await readFile('./package.json', 'utf8') const pkg = await readFile("./package.json", "utf8");
const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`) const pkgnew = pkg.replace(/"version": ".+"/, `"version": "v${ver}"`);
await writeFile('./package.json', pkgnew) await writeFile("./package.json", pkgnew);
const spago = await readFile('./spago.yaml', 'utf8') const spago = await readFile("./spago.yaml", "utf8");
const spagonew = spago.replace(/version: .+/, `version: '${ver}'`) const spagonew = spago.replace(/version: .+/, `version: '${ver}'`);
await writeFile('./spago.yaml', spagonew) await writeFile("./spago.yaml", spagonew);
const readme = await readFile('./README.md', 'utf8') const readme = await readFile("./README.md", "utf8");
const readmenew = readme.replace( const readmenew = readme.replace(
/packages\/purescript-csv-stream\/.+?\//g, /packages\/purescript-csv-stream\/.+?\//g,
`/packages/purescript-csv-stream/${ver}/`, `/packages/purescript-csv-stream/${ver}/`,
) );
await writeFile('./README.md', readmenew) await writeFile("./README.md", readmenew);
execSync(`git add spago.yaml package.json README.md`) execSync(`git add spago.yaml package.json README.md`);
execSync(`git commit -m 'chore: prepare v${ver}'`) execSync(`git commit -m 'chore: prepare v${ver}'`);
execSync(`git tag v${ver}`) execSync(`git tag v${ver}`);
execSync(`git push --tags`) execSync(`git push --tags`);
execSync(`git push --mirror github-mirror`) execSync(`git push --mirror github-mirror`);

View File

@ -15,6 +15,7 @@ workspace:
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork
- integers: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
@ -25,6 +26,7 @@ workspace:
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
@ -59,6 +61,7 @@ workspace:
- foldable-traversable - foldable-traversable
- foreign - foreign
- foreign-object - foreign-object
- fork
- formatters - formatters
- functions - functions
- functors - functors
@ -339,6 +342,12 @@ packages:
- tuples - tuples
- typelevel-prelude - typelevel-prelude
- unfoldable - unfoldable
fork:
type: registry
version: 6.0.0
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
dependencies:
- aff
formatters: formatters:
type: registry type: registry
version: 7.0.0 version: 7.0.0

View File

@ -22,6 +22,7 @@ package:
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- integers: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
@ -32,6 +33,7 @@ package:
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"

View File

@ -22,7 +22,8 @@ class RowToList r rl <= WriteCSVRecord r rl | rl -> r where
writeCSVRecord :: { | r } -> Array String writeCSVRecord :: { | r } -> Array String
instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where instance (RowToList r (Cons k v tailrl), IsSymbol k, WriteCSV v, Lacks k tail, Cons k v tail r, WriteCSVRecord tail tailrl) => WriteCSVRecord r (Cons k v tailrl) where
writeCSVRecord r = let writeCSVRecord r =
let
val = writeCSV $ Record.get (Proxy @k) r val = writeCSV $ Record.get (Proxy @k) r
tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r tail = writeCSVRecord @tail @tailrl $ Record.delete (Proxy @k) r
in in

View File

@ -1,29 +1,29 @@
import {parse, Parser} from 'csv-parse' import { parse, Parser } from "csv-parse";
class ParserWithColumns extends Parser { class ParserWithColumns extends Parser {
/** @type {Array<string>} */ /** @type {Array<string>} */
columns = [] columns = [];
/** @type {Map<string, number> | null} */ /** @type {Map<string, number> | null} */
columnsMap = null columnsMap = null;
} }
/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */ /** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */
export const makeImpl = c => () => { export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c) const parser = new ParserWithColumns(c);
parser.once('readable', () => { parser.once("readable", () => {
parser.columns = parser.read(); parser.columns = parser.read();
}) });
return parser return parser;
} };
/** @type {(s: ParserWithColumns) => () => Array<string> | null} */ /** @type {(s: ParserWithColumns) => () => Array<string> | null} */
export const readImpl = p => () => p.read(); export const readImpl = (p) => () => p.read();
/** @type {(s: ParserWithColumns) => () => Array<string>} */ /** @type {(s: ParserWithColumns) => () => Array<string>} */
export const columnsArrayImpl = p => () => p.columns export const columnsArrayImpl = (p) => () => p.columns;
/** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */ /** @type {(s: ParserWithColumns) => () => Map<string, number> | null} */
export const columnsMapImpl = p => () => p.columnsMap export const columnsMapImpl = (p) => () => p.columnsMap;
/** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */ /** @type {(s: ParserWithColumns) => (m: Map<string, number>) => () => void} */
export const setColumnsMapImpl = p => m => () => p.columnsMap = m export const setColumnsMapImpl = (p) => (m) => () => (p.columnsMap = m);

View File

@ -1,14 +1,17 @@
module Node.Stream.CSV.Parse where module Node.Stream.CSV.Parse where
import Prelude import Prelude hiding (join)
import Control.Alt ((<|>)) import Control.Alt ((<|>))
import Control.Alternative (class Alternative)
import Control.Monad.Error.Class (liftEither) import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Fork.Class (class MonadFork, fork, join)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, whileJust) import Control.Monad.Rec.Class (class MonadRec, tailRecM, Step(..), whileJust)
import Control.Monad.ST.Global as ST import Control.Monad.ST.Global as ST
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parTraverse)
import Data.Array as Array import Data.Array as Array
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap) import Data.Bifunctor (lmap)
@ -21,7 +24,6 @@ import Data.Maybe (Maybe(..))
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.Nullable (Nullable) import Data.Nullable (Nullable)
import Data.Nullable as Nullable import Data.Nullable as Nullable
import Data.Traversable (for_)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Canceler(..), delay, makeAff) import Effect.Aff (Canceler(..), delay, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
@ -88,7 +90,7 @@ make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string -- | Synchronously parse a CSV string
parse :: forall @r rl @config missing extra m. MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r }) parse :: forall @r rl @config missing extra m p f. Parallel p m => MonadFork f m => Alternative p => MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r })
parse config csv = do parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv void $ liftEffect $ Stream.writeString stream UTF8 csv
@ -96,17 +98,21 @@ parse config csv = do
readAll stream readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed. -- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach :: forall @r rl x m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit foreach :: forall @r rl x m f p. Parallel p m => Alternative p => MonadFork f m => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit
foreach stream cb = whileJust do foreach stream cb = whileJust do
liftAff $ delay $ wrap 0.0 liftAff $ delay $ wrap 0.0
isReadable <- liftEffect $ Stream.readable stream isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> do liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop pure $ Canceler $ const $ liftEffect stop
whileJust do fibers <- flip tailRecM [] \fibers -> do
r <- liftEffect $ read @r stream r <- liftEffect $ read @r stream
for_ r cb case r of
pure $ void r Just r' -> do
f <- fork (cb r')
pure $ Loop $ fibers <> [ f ]
Nothing -> pure $ Done fibers
void $ parTraverse join fibers
isClosed <- liftEffect $ Stream.closed stream isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit pure $ if isClosed then Nothing else Just unit
@ -122,7 +128,7 @@ read stream = runMaybeT do
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array -- | Collect all parsed records into an array
readAll :: forall @r rl a m. MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r }) readAll :: forall @r rl a m p f. Parallel p m => MonadFork f m => Alternative p => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r })
readAll stream = do readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records

View File

@ -1,7 +1,7 @@
import {stringify} from 'csv-stringify' import { stringify } from "csv-stringify";
/** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */ /** @type {(c: import('csv-stringify').Options) => () => import('csv-stringify').Stringifier} */
export const makeImpl = c => () => stringify(c) export const makeImpl = (c) => () => stringify(c);
/** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */ /** @type {(s: import('csv-stringify').Stringifier) => (vals: Array<string>) => () => void} */
export const writeImpl = s => vals => () => s.write(vals) export const writeImpl = (s) => (vals) => () => s.write(vals);