fix: maybe this is faster?

This commit is contained in:
orion 2024-05-03 12:47:12 -05:00
parent f3d9ea8c11
commit d6638ead1d
Signed by: orion
GPG Key ID: 6D4165AE4C928719

View File

@ -7,11 +7,11 @@ import Control.Alternative (guard)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parTraverse_)
import Control.Parallel (class Parallel, parSequence_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@ -123,28 +123,52 @@ foreach
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
do
q <- liftEffect $ liftST $ Array.ST.new
guard =<< not <$> liftEffect (Stream.closed stream)
let
deque = liftEffect $ liftST $ Array.ST.shift q
enque a = liftEffect $ liftST $ Array.ST.push a q
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
waitReadable =
makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
pure $ isNothing r
records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
processQ =
untilJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
r <- deque
isClosed <- liftEffect $ Stream.closed stream
if isClosed && isNothing r then
pure unit
else if isNothing r then
liftAff $ delay $ wrap 10.0
else do
r' <- MaybeT $ pure r
lift $ cb r'
guard $ isClosed
pure unit
lift $ parTraverse_ cb records
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
readToQ =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
guard =<< not <$> liftEffect (Stream.closed stream)
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) waitReadable
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r enque
pure $ isNothing r
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
parSequence_ [readToQ, processQ]
-- | Reads a parsed record from the stream.
-- |
@ -174,9 +198,9 @@ readAll
=> CSVParser r a
-> m (Array { | r })
readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records
liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze records
records <- liftEffect $ liftST $ Array.ST.new
foreach stream $ void <<< liftEffect <<< liftST <<< flip Array.ST.push records
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }