diff --git a/spago.lock b/spago.lock index 4bf11a0..92197d8 100644 --- a/spago.lock +++ b/spago.lock @@ -10,6 +10,7 @@ workspace: - effect: ">=4.0.0 <5.0.0" - either: ">=6.1.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0" + - filterable - foldable-traversable: ">=6.0.0 <7.0.0" - foreign: ">=7.0.0 <8.0.0" - foreign-object: ">=4.1.0 <5.0.0" @@ -51,6 +52,7 @@ workspace: - enums - exceptions - exists + - filterable - fixed-points - foldable-traversable - foreign @@ -268,6 +270,17 @@ packages: integrity: sha256-A0JQHpTfo1dNOj9U5/Fd3xndlRSE0g2IQWOGor2yXn8= dependencies: - unsafe-coerce + filterable: + type: registry + version: 5.0.0 + integrity: sha256-cCojJHRnTmpY1j1kegI4CFwghdQ2Fm/8dzM8IlC+lng= + dependencies: + - arrays + - either + - foldable-traversable + - identity + - lists + - ordered-collections fixed-points: type: registry version: 7.0.0 diff --git a/spago.yaml b/spago.yaml index 8b8aa08..1a55206 100644 --- a/spago.yaml +++ b/spago.yaml @@ -10,6 +10,9 @@ package: strict: true pedanticPackages: true dependencies: + - control + - filterable + - ordered-collections - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" diff --git a/src/Data.CSV.Record.purs b/src/Data.CSV.Record.purs index 9c3101a..efd51eb 100644 --- a/src/Data.CSV.Record.purs +++ b/src/Data.CSV.Record.purs @@ -7,8 +7,10 @@ import Control.Monad.Except (Except) import Data.Array as Array import Data.CSV (class ReadCSV, class WriteCSV, readCSV, writeCSV) import Data.List.NonEmpty (NonEmptyList) +import Data.Map (Map) +import Data.Map as Map import Data.Maybe (fromMaybe) -import Data.Symbol (class IsSymbol) +import Data.Symbol (class IsSymbol, reflectSymbol) import Foreign (ForeignError(..)) import Prim.Row (class Cons, class Lacks) import Prim.RowList (class RowToList, Cons, Nil, RowList) @@ -31,14 +33,17 @@ instance WriteCSVRecord () Nil where class ReadCSVRecord :: Row Type -> RowList Type -> Constraint class RowToList r rl <= ReadCSVRecord r rl | rl -> r where - readCSVRecord :: Array String -> Except (NonEmptyList ForeignError) { | r } + readCSVRecord :: Map String Int -> Array String -> Except (NonEmptyList ForeignError) { | r } instance (RowToList r (Cons k v tailrl), IsSymbol k, ReadCSV v, Lacks k tail, Cons k v tail r, ReadCSVRecord tail tailrl) => ReadCSVRecord r (Cons k v tailrl) where - readCSVRecord vals = do - valraw <- liftMaybe (pure $ ForeignError "unexpected end of record") $ Array.head vals + readCSVRecord cols vals = do + let + k = reflectSymbol (Proxy @k) + pos <- liftMaybe (pure $ ForeignError $ "row too long; did not expect value " <> k) $ Map.lookup k cols + valraw <- liftMaybe (pure $ ForeignError "unexpected end of record") $ Array.index vals pos val <- readCSV @v valraw - tail <- readCSVRecord @tail @tailrl (fromMaybe [] $ Array.tail vals) + tail <- readCSVRecord @tail @tailrl cols (fromMaybe [] $ Array.deleteAt pos vals) pure $ Record.insert (Proxy @k) val tail instance ReadCSVRecord () Nil where - readCSVRecord _ = pure {} + readCSVRecord _ _ = pure {} diff --git a/src/Node.Stream.CSV.Parse.js b/src/Node.Stream.CSV.Parse.js index a9bd20c..e0483ab 100644 --- a/src/Node.Stream.CSV.Parse.js +++ b/src/Node.Stream.CSV.Parse.js @@ -1,7 +1,36 @@ -import {parse} from 'csv-parse' +import {parse, Parser} from 'csv-parse' -/** @type {(s: import('csv-parse').Options) => () => import('csv-parse').Parser} */ -export const makeImpl = c => () => parse(c) +class ParserWithColumns extends Parser { + /** @type {Array} */ + columns = [] + /** @type {Map | null} */ + columnsMap = null +} -/** @type {(s: import('stream').Duplex) => () => string[] | null} */ -export const readImpl = s => () => s.read(); +/** @type {(s: import('csv-parse').Options) => () => ParserWithColumns} */ +export const makeImpl = c => () => { + const parser = new ParserWithColumns(c) + parser.once('data', columns => { + parser.columns = columns; + }) + return parser +} + +/** @type {(s: ParserWithColumns) => () => Array | null} */ +export const readImpl = p => () => { + const chunk = p.read(); + if (chunk === p.columns) { + return p.read() + } else { + return chunk + } +} + +/** @type {(s: ParserWithColumns) => () => Array} */ +export const columnsArrayImpl = p => () => p.columns + +/** @type {(s: ParserWithColumns) => () => Map | null} */ +export const columnsMapImpl = p => () => p.columnsMap + +/** @type {(s: ParserWithColumns) => (m: Map) => () => void} */ +export const setColumnsMapImpl = p => m => () => p.columnsMap = m diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index e0730a3..97216e2 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -2,15 +2,20 @@ module Node.Stream.CSV.Parse where import Prelude +import Control.Alt ((<|>)) import Control.Monad.Error.Class (liftEither) import Control.Monad.Except (runExcept) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Rec.Class (whileJust) import Control.Monad.ST.Global as ST +import Control.Monad.Trans.Class (lift) +import Data.Array as Array import Data.Array.ST as Array.ST import Data.Bifunctor (lmap) import Data.CSV.Record (class ReadCSVRecord, readCSVRecord) import Data.Either (Either(..)) +import Data.Filterable (filter) +import Data.Map (Map) import Data.Maybe (Maybe(..)) import Data.Nullable (Nullable) import Data.Nullable as Nullable @@ -22,6 +27,7 @@ import Effect.Exception (error) import Effect.Uncurried (mkEffectFn1) import Foreign (Foreign, unsafeToForeign) import Foreign.Object (Object) +import Data.Map as Map import Foreign.Object as Object import Node.Encoding (Encoding(..)) import Node.EventEmitter (EventHandle(..)) @@ -107,7 +113,8 @@ foreach stream cb = whileJust do read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r }) read stream = runMaybeT do raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream - liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl raw + cols <- MaybeT $ getOrInitColumnsMap stream + liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw -- | Collect all parsed records into an array readAll :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Aff (Array { | r }) @@ -126,6 +133,30 @@ foreign import makeImpl :: forall r. Foreign -> Effect (Stream r) -- | FFI foreign import readImpl :: forall r. Stream r -> Effect (Nullable (Array String)) +-- | FFI +foreign import columnsArrayImpl :: forall r. Stream r -> Effect (Array String) + +-- | FFI +foreign import columnsMapImpl :: forall r. Stream r -> Effect (Nullable (Map String Int)) + +-- | FFI +foreign import setColumnsMapImpl :: forall r. Stream r -> Map String Int -> Effect Unit + +-- | FFI +getOrInitColumnsMap :: forall r x. CSVParser r x -> Effect (Maybe (Map String Int)) +getOrInitColumnsMap s = runMaybeT do + cols :: Array String <- MaybeT $ filter (not <<< Array.null) <$> Just <$> columnsArrayImpl s + let + get = MaybeT $ Nullable.toMaybe <$> columnsMapImpl s + init = do + let + ixs = Array.range 0 (Array.length cols - 1) + assoc = Array.zip cols ixs + map = Map.fromFoldable assoc + lift $ setColumnsMapImpl s map + pure map + get <|> init + -- | FFI recordToForeign :: forall r. Record r -> Object Foreign recordToForeign = unsafeCoerce