diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index fb70e7d..fd5fa6f 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -7,11 +7,11 @@ import Control.Alternative (guard) import Control.Monad.Error.Class (liftEither) import Control.Monad.Except (runExcept) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) -import Control.Monad.Rec.Class (class MonadRec, whileJust) -import Control.Monad.ST.Global as ST +import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust) +import Control.Monad.ST.Class (liftST) import Control.Monad.Trans.Class (lift) import Control.MonadPlus (class Alternative) -import Control.Parallel (class Parallel, parTraverse_) +import Control.Parallel (class Parallel, parSequence_) import Data.Array as Array import Data.Array.ST as Array.ST import Data.Bifunctor (lmap) @@ -123,29 +123,53 @@ foreach -> ({ | r } -> m Unit) -> m Unit foreach stream cb = - whileJust - $ runMaybeT - $ do - liftAff $ delay $ wrap 0.0 + do + q <- liftEffect $ liftST $ Array.ST.new - guard =<< not <$> liftEffect (Stream.closed stream) + let + deque = liftEffect $ liftST $ Array.ST.shift q + enque a = liftEffect $ liftST $ Array.ST.push a q - isReadable <- liftEffect $ Stream.readable stream - liftAff $ when (not isReadable) $ makeAff \res -> do - stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit - pure $ Canceler $ const $ liftEffect stop + waitReadable = + makeAff \res -> do + stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit + pure $ Canceler $ const $ liftEffect stop - recordsST <- liftEffect $ ST.toEffect $ Array.ST.new - liftEffect $ Effect.untilE do - r <- read @r stream - void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST - pure $ isNothing r - records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST + processQ = + untilJust + $ runMaybeT + $ do + liftAff $ delay $ wrap 0.0 + r <- deque + isClosed <- liftEffect $ Stream.closed stream + if isClosed && isNothing r then + pure unit + else if isNothing r then + liftAff $ delay $ wrap 10.0 + else do + r' <- MaybeT $ pure r + lift $ cb r' + guard $ isClosed + pure unit - lift $ parTraverse_ cb records - guard =<< not <$> liftEffect (Stream.closed stream) - pure unit + readToQ = + whileJust + $ runMaybeT + $ do + liftAff $ delay $ wrap 0.0 + guard =<< not <$> liftEffect (Stream.closed stream) + isReadable <- liftEffect $ Stream.readable stream + liftAff $ when (not isReadable) waitReadable + liftEffect $ Effect.untilE do + r <- read @r stream + void $ for r enque + pure $ isNothing r + guard =<< not <$> liftEffect (Stream.closed stream) + pure unit + + parSequence_ [readToQ, processQ] + -- | Reads a parsed record from the stream. -- | -- | Returns `Nothing` when either: @@ -174,9 +198,9 @@ readAll => CSVParser r a -> m (Array { | r }) readAll stream = do - records <- liftEffect $ ST.toEffect $ Array.ST.new - foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records - liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze records + records <- liftEffect $ liftST $ Array.ST.new + foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records + liftEffect $ liftST $ Array.ST.unsafeFreeze records -- | `data` event. Emitted when a CSV record has been parsed. dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }