fix: prematurely ending

This commit is contained in:
orion 2024-05-07 08:35:48 -05:00
parent 092276ed4c
commit aa7fb66b74
Signed by: orion
GPG Key ID: 6D4165AE4C928719

View File

@ -7,7 +7,10 @@ import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (untilJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift)
import Data.Array as Array
import Data.Array.ST as Array.ST
@ -18,10 +21,11 @@ import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Effect (Effect)
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff (Canceler(..), delay, launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
@ -121,16 +125,24 @@ foreach stream cb = do
pure $ Canceler $ const $ liftEffect stop
liftAff $ makeAff \res -> do
count <- ST.toEffect $ STRef.new 0
removeDataListener <- flip (Event.on dataH) stream \row ->
void
$ flip catchError (res <<< Left)
$ do
void $ ST.toEffect $ STRef.modify (_ + 1) count
cols <- liftMaybe (error "unreachable") =<< getOrInitColumnsMap stream
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
flip catchError (liftEffect <<< res <<< Left) (cb record)
void $ ST.toEffect $ STRef.modify (_ - 1) count
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
removeErrorListener <- flip (Event.once Stream.errorH) stream (res <<< Left)
removeEndListener <- flip (Event.once Stream.endH) stream $ launchAff_ do
untilJust do
delay $ wrap 10.0
ct <- liftEffect $ ST.toEffect $ STRef.read count
pure $ if ct == 0 then Just unit else Nothing
liftEffect $ res $ Right unit
pure $ Canceler $ const $ liftEffect do
removeDataListener