diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index 47b4801..fb70e7d 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -3,6 +3,7 @@ module Node.Stream.CSV.Parse where import Prelude hiding (join) import Control.Alt ((<|>)) +import Control.Alternative (guard) import Control.Monad.Error.Class (liftEither) import Control.Monad.Except (runExcept) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) @@ -20,12 +21,13 @@ import Data.Filterable (filter) import Data.Map (Map) import Data.Map as Map import Data.Maybe (Maybe(..), isNothing) +import Data.Newtype (wrap) import Data.Nullable (Nullable) import Data.Nullable as Nullable import Data.Traversable (for) import Effect (Effect) import Effect as Effect -import Effect.Aff (Canceler(..), makeAff) +import Effect.Aff (Canceler(..), delay, makeAff) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Effect.Exception (error) @@ -120,22 +122,29 @@ foreach => CSVParser r x -> ({ | r } -> m Unit) -> m Unit -foreach stream cb = whileJust do - isReadable <- liftEffect $ Stream.readable stream - liftAff $ when (not isReadable) $ makeAff \res -> do - stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit - pure $ Canceler $ const $ liftEffect stop +foreach stream cb = + whileJust + $ runMaybeT + $ do + liftAff $ delay $ wrap 0.0 - recordsST <- liftEffect $ ST.toEffect $ Array.ST.new - liftEffect $ Effect.untilE do - r <- read @r stream - void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST - pure $ isNothing r - records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST + guard =<< not <$> liftEffect (Stream.closed stream) - parTraverse_ cb records - isClosed <- liftEffect $ Stream.closed stream - pure $ if isClosed then Nothing else Just unit + isReadable <- liftEffect $ Stream.readable stream + liftAff $ when (not isReadable) $ makeAff \res -> do + stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit + pure $ Canceler $ const $ liftEffect stop + + recordsST <- liftEffect $ ST.toEffect $ Array.ST.new + liftEffect $ Effect.untilE do + r <- read @r stream + void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST + pure $ isNothing r + records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST + + lift $ parTraverse_ cb records + guard =<< not <$> liftEffect (Stream.closed stream) + pure unit -- | Reads a parsed record from the stream. -- |