fix: more efficiently / less blockingly read?

This commit is contained in:
orion kindel 2024-05-03 10:54:12 -05:00
parent 5f8a82c8d8
commit 87f42c3bfe
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 56 additions and 28 deletions

View File

@ -15,7 +15,6 @@ workspace:
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- integers: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
@ -61,7 +60,6 @@ workspace:
- foldable-traversable - foldable-traversable
- foreign - foreign
- foreign-object - foreign-object
- fork
- formatters - formatters
- functions - functions
- functors - functors
@ -342,12 +340,6 @@ packages:
- tuples - tuples
- typelevel-prelude - typelevel-prelude
- unfoldable - unfoldable
fork:
type: registry
version: 6.0.0
integrity: sha256-X7u0SuCvFbLbzuNEKLBNuWjmcroqMqit4xEzpQwAP7E=
dependencies:
- aff
formatters: formatters:
type: registry type: registry
version: 7.0.0 version: 7.0.0

View File

@ -22,7 +22,6 @@ package:
- foldable-traversable: ">=6.0.0 <7.0.0" - foldable-traversable: ">=6.0.0 <7.0.0"
- foreign: ">=7.0.0 <8.0.0" - foreign: ">=7.0.0 <8.0.0"
- foreign-object: ">=4.1.0 <5.0.0" - foreign-object: ">=4.1.0 <5.0.0"
- fork: ">=6.0.0 <7.0.0"
- integers: ">=6.0.0 <7.0.0" - integers: ">=6.0.0 <7.0.0"
- lists: ">=7.0.0 <8.0.0" - lists: ">=7.0.0 <8.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
@ -33,6 +32,7 @@ package:
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"

View File

@ -5,11 +5,12 @@ import Prelude hiding (join)
import Control.Alt ((<|>)) import Control.Alt ((<|>))
import Control.Monad.Error.Class (liftEither) import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Fork.Class (class MonadFork, fork, join)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, tailRecM, Step(..), whileJust) import Control.Monad.Rec.Class (class MonadRec, whileJust)
import Control.Monad.ST.Global as ST import Control.Monad.ST.Global as ST
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parTraverse_)
import Data.Array as Array import Data.Array as Array
import Data.Array.ST as Array.ST import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap) import Data.Bifunctor (lmap)
@ -18,13 +19,13 @@ import Data.Either (Either(..))
import Data.Filterable (filter) import Data.Filterable (filter)
import Data.Map (Map) import Data.Map (Map)
import Data.Map as Map import Data.Map as Map
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Nullable (Nullable) import Data.Nullable (Nullable)
import Data.Nullable as Nullable import Data.Nullable as Nullable
import Data.Traversable (traverse) import Data.Traversable (for)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Canceler(..), delay, makeAff) import Effect as Effect
import Effect.Aff (Canceler(..), makeAff)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (error) import Effect.Exception (error)
@ -89,7 +90,18 @@ make :: forall @r rl @config @missing @extra. RowToList r rl => ReadCSVRecord r
make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: false, cast: false, cast_date: false }) <<< recordToForeign
-- | Synchronously parse a CSV string -- | Synchronously parse a CSV string
parse :: forall @r rl @config missing extra m f. MonadFork f m => MonadAff m => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) => { | config } -> String -> m (Array { | r }) parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
=> { | config }
-> String
-> m (Array { | r })
parse config csv = do parse config csv = do
stream <- liftEffect $ make @r @config @missing @extra config stream <- liftEffect $ make @r @config @missing @extra config
void $ liftEffect $ Stream.writeString stream UTF8 csv void $ liftEffect $ Stream.writeString stream UTF8 csv
@ -97,21 +109,31 @@ parse config csv = do
readAll stream readAll stream
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed. -- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach :: forall @r rl x m f. MonadFork f m => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb = whileJust do foreach stream cb = whileJust do
isReadable <- liftEffect $ Stream.readable stream isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) $ makeAff \res -> do liftAff $ when (not isReadable) $ makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop pure $ Canceler $ const $ liftEffect stop
fibers <- flip tailRecM [] \fibers -> do
liftAff $ delay $ wrap 0.0 recordsST <- liftEffect $ ST.toEffect $ Array.ST.new
r <- liftEffect $ read @r stream liftEffect $ Effect.untilE do
case r of r <- read @r stream
Just r' -> do void $ for r $ ST.toEffect <<< flip Array.ST.push recordsST
f <- fork (cb r') pure $ isNothing r
pure $ Loop $ fibers <> [ f ] records <- liftEffect $ ST.toEffect $ Array.ST.unsafeFreeze recordsST
Nothing -> pure $ Done fibers
void $ traverse join fibers parTraverse_ cb records
isClosed <- liftEffect $ Stream.closed stream isClosed <- liftEffect $ Stream.closed stream
pure $ if isClosed then Nothing else Just unit pure $ if isClosed then Nothing else Just unit
@ -120,14 +142,28 @@ foreach stream cb = whileJust do
-- | Returns `Nothing` when either: -- | Returns `Nothing` when either:
-- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`) -- | - The internal buffer of parsed records has been exhausted, but there will be more (`Node.Stream.readable` and `Node.Stream.closed` are both `false`)
-- | - All records have been processed (`Node.Stream.closed` is `true`) -- | - All records have been processed (`Node.Stream.closed` is `true`)
read :: forall @r rl a. RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> Effect (Maybe { | r }) read
:: forall @r rl a
. RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> Effect (Maybe { | r })
read stream = runMaybeT do read stream = runMaybeT do
cols <- MaybeT $ getOrInitColumnsMap stream cols <- MaybeT $ getOrInitColumnsMap stream
raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream raw :: Array String <- MaybeT $ Nullable.toMaybe <$> readImpl stream
liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols raw
-- | Collect all parsed records into an array -- | Collect all parsed records into an array
readAll :: forall @r rl a m f. MonadFork f m => MonadRec m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r a -> m (Array { | r }) readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r a
-> m (Array { | r })
readAll stream = do readAll stream = do
records <- liftEffect $ ST.toEffect $ Array.ST.new records <- liftEffect $ ST.toEffect $ Array.ST.new
foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records foreach stream $ void <<< liftEffect <<< ST.toEffect <<< flip Array.ST.push records