From 093fff058d282d122def6754c0732ebd91e6a511 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 3 May 2024 14:00:35 -0500 Subject: [PATCH] fix: add columns event --- src/Node.Stream.CSV.Parse.js | 1 + src/Node.Stream.CSV.Parse.purs | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/Node.Stream.CSV.Parse.js b/src/Node.Stream.CSV.Parse.js index 8109df9..a8e31a2 100644 --- a/src/Node.Stream.CSV.Parse.js +++ b/src/Node.Stream.CSV.Parse.js @@ -12,6 +12,7 @@ export const makeImpl = (c) => () => { const parser = new ParserWithColumns(c); parser.once("readable", () => { parser.columns = parser.read(); + parser.emit('columns', parser.columns) }); return parser; }; diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index ec6f6ef..1521a82 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -3,7 +3,7 @@ module Node.Stream.CSV.Parse where import Prelude hiding (join) import Control.Alt ((<|>)) -import Control.Monad.Error.Class (liftEither) +import Control.Monad.Error.Class (liftEither, liftMaybe) import Control.Monad.Except (runExcept) import Control.Monad.Except.Trans (catchError) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) @@ -15,7 +15,6 @@ import Data.Bifunctor (lmap) import Data.CSV.Record (class ReadCSVRecord, readCSVRecord) import Data.Either (Either(..)) import Data.Filterable (filter) -import Data.Foldable (for_) import Data.Map (Map) import Data.Map as Map import Data.Maybe (Maybe(..)) @@ -117,12 +116,16 @@ foreach -> m Unit foreach stream cb = do UnliftAff unlift <- askUnliftAff + + liftAff $ makeAff \res -> pure mempty <* flip (Event.once columnsH) stream $ const do + void $ getOrInitColumnsMap stream + res $ Right unit + liftAff $ makeAff \res -> do removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do - cols <- getOrInitColumnsMap stream - for_ cols \cols' -> do - record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' row - launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record) + cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream + record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row + launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record) removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit) removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left) @@ -165,6 +168,10 @@ readAll stream = do dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String) dataH = EventHandle "data" mkEffectFn1 +-- | `columns` event. Emitted when the header row has been parsed. +columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String) +columnsH = EventHandle "columns" mkEffectFn1 + -- | FFI foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)