diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index 091f876..35aab5f 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -7,7 +7,10 @@ import Control.Monad.Error.Class (liftEither, liftMaybe) import Control.Monad.Except (runExcept) import Control.Monad.Except.Trans (catchError) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) +import Control.Monad.Rec.Class (untilJust) import Control.Monad.ST.Class (liftST) +import Control.Monad.ST.Global as ST +import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) import Data.Array as Array import Data.Array.ST as Array.ST @@ -18,10 +21,11 @@ import Data.Filterable (filter) import Data.Map (Map) import Data.Map as Map import Data.Maybe (Maybe(..), isNothing) +import Data.Newtype (wrap) import Data.Nullable (Nullable) import Data.Nullable as Nullable import Effect (Effect) -import Effect.Aff (Canceler(..), makeAff) +import Effect.Aff (Canceler(..), delay, launchAff_, makeAff) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Effect.Exception (error) @@ -121,16 +125,24 @@ foreach stream cb = do pure $ Canceler $ const $ liftEffect stop liftAff $ makeAff \res -> do + count <- ST.toEffect $ STRef.new 0 removeDataListener <- flip (Event.on dataH) stream \row -> void $ flip catchError (res <<< Left) $ do + void $ ST.toEffect $ STRef.modify (_ + 1) count cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row flip catchError (liftEffect <<< res <<< Left) (cb record) + void $ ST.toEffect $ STRef.modify (_ - 1) count - removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit) removeErrorListener <- flip (Event.once Stream.errorH) stream (res <<< Left) + removeEndListener <- flip (Event.once Stream.endH) stream $ launchAff_ do + untilJust do + delay $ wrap 10.0 + ct <- liftEffect $ ST.toEffect $ STRef.read count + pure $ if ct == 0 then Just unit else Nothing + liftEffect $ res $ Right unit pure $ Canceler $ const $ liftEffect do removeDataListener