fix: add columns event

This commit is contained in:
orion 2024-05-03 14:00:35 -05:00
parent c8d83e8cf3
commit 093fff058d
Signed by: orion
GPG Key ID: 6D4165AE4C928719
2 changed files with 14 additions and 6 deletions

View File

@ -12,6 +12,7 @@ export const makeImpl = (c) => () => {
const parser = new ParserWithColumns(c); const parser = new ParserWithColumns(c);
parser.once("readable", () => { parser.once("readable", () => {
parser.columns = parser.read(); parser.columns = parser.read();
parser.emit('columns', parser.columns)
}); });
return parser; return parser;
}; };

View File

@ -3,7 +3,7 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Alt ((<|>)) import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither) import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError) import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
@ -15,7 +15,6 @@ import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, readCSVRecord) import Data.CSV.Record (class ReadCSVRecord, readCSVRecord)
import Data.Either (Either(..)) import Data.Either (Either(..))
import Data.Filterable (filter) import Data.Filterable (filter)
import Data.Foldable (for_)
import Data.Map (Map) import Data.Map (Map)
import Data.Map as Map import Data.Map as Map
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
@ -117,12 +116,16 @@ foreach
-> m Unit -> m Unit
foreach stream cb = do foreach stream cb = do
UnliftAff unlift <- askUnliftAff UnliftAff unlift <- askUnliftAff
liftAff $ makeAff \res -> pure mempty <* flip (Event.once columnsH) stream $ const do
void $ getOrInitColumnsMap stream
res $ Right unit
liftAff $ makeAff \res -> do liftAff $ makeAff \res -> do
removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do removeDataListener <- flip (Event.on dataH) stream \row -> launchAff_ $ delay (wrap 0.0) <* liftEffect do
cols <- getOrInitColumnsMap stream cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
for_ cols \cols' -> do record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' row launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit) removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left) removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left)
@ -165,6 +168,10 @@ readAll stream = do
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String) dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
dataH = EventHandle "data" mkEffectFn1 dataH = EventHandle "data" mkEffectFn1
-- | `columns` event. Emitted when the header row has been parsed.
columnsH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
columnsH = EventHandle "columns" mkEffectFn1
-- | FFI -- | FFI
foreign import makeImpl :: forall r. Foreign -> Effect (Stream r) foreign import makeImpl :: forall r. Foreign -> Effect (Stream r)