fix: timeslice shenanigans

This commit is contained in:
orion 2024-05-03 10:58:47 -05:00
parent a3625ab1b7
commit 07c86f096f
Signed by: orion
GPG Key ID: 6D4165AE4C928719

View File

@ -3,6 +3,7 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Alt ((<|>)) import Control.Alt ((<|>))
import Control.Alternative (guard)
import Control.Monad.Error.Class (liftEither) import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
@ -20,12 +21,13 @@ import Data.Filterable (filter)
import Data.Map (Map) import Data.Map (Map)
import Data.Map as Map import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing) import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable) import Data.Nullable (Nullable)
import Data.Nullable as Nullable import Data.Nullable as Nullable
import Data.Traversable (for) import Data.Traversable (for)
import Effect (Effect) import Effect (Effect)
import Effect as Effect import Effect as Effect
import Effect.Aff (Canceler(..), makeAff) import Effect.Aff (Canceler(..), delay, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (error) import Effect.Exception (error)
@ -120,22 +122,29 @@ foreach
=> CSVParser r x => CSVParser r x
-> ({ | r } -> m Unit) -> ({ | r } -> m Unit)
-> m Unit -> m Unit
foreach stream cb = whileJust do foreach stream cb =
isReadable <- liftEffect $ Stream.readable stream whileJust
liftAff $ when (not isReadable) $ makeAff \res -> do $ runMaybeT
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit $ do
pure $ Canceler $ const $ liftEffect stop liftAff $ delay $ wrap 0.0
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new guard =<< not <$> liftEffect (Stream.closed stream)
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
parTraverse_ cb records isReadable <- liftEffect $ Stream.readable stream
isClosed <- liftEffect $ Stream.closed stream liftAff $ when (not isReadable) $ makeAff \res -> do
pure $ if isClosed then Nothing else Just unit stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
lift $ parTraverse_ cb records
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
-- | Reads a parsed record from the stream. -- | Reads a parsed record from the stream.
-- | -- |