fix: rework to just use streaming mode

This commit is contained in:
orion 2024-05-03 13:40:06 -05:00
parent 2be8960836
commit 42e779a2a7
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 104 additions and 71 deletions

View File

@ -25,7 +25,6 @@ workspace:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@ -35,6 +34,7 @@ workspace:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies:
- console
@ -43,6 +43,7 @@ workspace:
- arraybuffer-types
- arrays
- bifunctors
- catenable-lists
- console
- const
- contravariant
@ -61,6 +62,8 @@ workspace:
- foreign
- foreign-object
- formatters
- free
- freet
- functions
- functors
- gen
@ -71,6 +74,7 @@ workspace:
- lazy
- lists
- maybe
- monad-control
- newtype
- node-buffer
- node-event-emitter
@ -100,6 +104,7 @@ workspace:
- typelevel-prelude
- unfoldable
- unicode
- unlift
- unsafe-coerce
extra_packages: {}
packages:
@ -159,6 +164,18 @@ packages:
- newtype
- prelude
- tuples
catenable-lists:
type: registry
version: 7.0.0
integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA=
dependencies:
- control
- foldable-traversable
- lists
- maybe
- prelude
- tuples
- unfoldable
console:
type: registry
version: 6.1.0
@ -352,6 +369,40 @@ packages:
- parsing
- prelude
- transformers
free:
type: registry
version: 7.1.0
integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k=
dependencies:
- catenable-lists
- control
- distributive
- either
- exists
- foldable-traversable
- invariant
- lazy
- maybe
- prelude
- tailrec
- transformers
- tuples
- unsafe-coerce
freet:
type: registry
version: 7.0.0
integrity: sha256-zkL6wU4ZPq8xz1kGFxoliWqyhBksepMJTyA68VEBaJo=
dependencies:
- aff
- bifunctors
- effect
- either
- exists
- free
- prelude
- tailrec
- transformers
- tuples
functions:
type: registry
version: 6.0.0
@ -461,6 +512,15 @@ packages:
- invariant
- newtype
- prelude
monad-control:
type: registry
version: 5.0.0
integrity: sha256-bgfDW30wbIm70NR1Tvvh9P+VFQMDh1wK2sSJXCj/dZc=
dependencies:
- aff
- freet
- identity
- lists
newtype:
type: registry
version: 5.0.0
@ -784,6 +844,23 @@ packages:
- foldable-traversable
- maybe
- strings
unlift:
type: registry
version: 1.0.1
integrity: sha256-nbBCVV0fZz/3UHKoW11dcTwBYmQOIgK31ht2BN47RPw=
dependencies:
- aff
- effect
- either
- freet
- identity
- lists
- maybe
- monad-control
- prelude
- st
- transformers
- tuples
unsafe-coerce:
type: registry
version: 6.0.0

View File

@ -32,7 +32,6 @@ package:
- nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0"
- parallel: ">=6.0.0 <7.0.0"
- precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0"
@ -42,6 +41,7 @@ package:
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0"
- unlift: ">=1.0.1 <2.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0"
test:
main: Test.Main

View File

@ -3,15 +3,12 @@ module Node.Stream.CSV.Parse where
import Prelude hiding (join)
import Control.Alt ((<|>))
import Control.Alternative (guard, empty)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Error.Class (liftEither, liftMaybe)
import Control.Monad.Except (runExcept)
import Control.Monad.Except.Trans (catchError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Control.MonadPlus (class Alternative)
import Control.Parallel (class Parallel, parSequence_)
import Data.Array as Array
import Data.Array.ST as Array.ST
import Data.Bifunctor (lmap)
@ -20,15 +17,13 @@ import Data.Either (Either(..))
import Data.Filterable (filter)
import Data.Map (Map)
import Data.Map as Map
import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (wrap)
import Data.Maybe (Maybe(..))
import Data.Nullable (Nullable)
import Data.Nullable as Nullable
import Data.Traversable (for)
import Effect (Effect)
import Effect as Effect
import Effect.Aff (Canceler(..), delay, makeAff)
import Effect.Aff (Canceler(..), launchAff_, makeAff)
import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Aff.Unlift (class MonadUnliftAff, UnliftAff(..), askUnliftAff)
import Effect.Class (liftEffect)
import Effect.Exception (error)
import Effect.Uncurried (mkEffectFn1)
@ -93,11 +88,9 @@ make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns:
-- | Synchronously parse a CSV string
parse
:: forall @r rl @config missing extra m p
. Alternative p
=> Parallel p m
:: forall @r rl @config missing extra m
. MonadUnliftAff m
=> MonadAff m
=> MonadRec m
=> RowToList r rl
=> ReadCSVRecord r rl
=> Union config missing (Config extra)
@ -112,63 +105,28 @@ parse config csv = do
-- | Loop until the stream is closed, invoking the callback with each record as it is parsed.
foreach
:: forall @r rl x m p
. Alternative p
=> Parallel p m
=> MonadRec m
:: forall @r rl x m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
=> CSVParser r x
-> ({ | r } -> m Unit)
-> m Unit
foreach stream cb =
do
q <- liftEffect $ liftST $ Array.ST.new
foreach stream cb = do
UnliftAff unlift <- askUnliftAff
liftAff $ makeAff \res -> do
removeDataListener <- flip (Event.on dataH) stream \row -> do
cols <- liftMaybe (error "did not read header column") =<< getOrInitColumnsMap stream
record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row
launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record)
removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit)
removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left)
let
deque = liftEffect $ liftST $ Array.ST.shift q
enque a = liftEffect $ liftST $ Array.ST.push a q
waitReadable =
makeAff \res -> do
stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit
pure $ Canceler $ const $ liftEffect stop
processQ =
untilJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
r <- deque
isClosed <- liftEffect $ Stream.closed stream
if isClosed && isNothing r then
pure unit
else if isNothing r then do
liftAff $ delay $ wrap 10.0
empty
else do
r' <- MaybeT $ pure r
lift $ cb r'
empty
readToQ =
whileJust
$ runMaybeT
$ do
liftAff $ delay $ wrap 0.0
guard =<< not <$> liftEffect (Stream.closed stream)
isReadable <- liftEffect $ Stream.readable stream
liftAff $ when (not isReadable) waitReadable
liftEffect $ Effect.untilE do
r <- read @r stream
void $ for r enque
pure $ isNothing r
guard =<< not <$> liftEffect (Stream.closed stream)
pure unit
parSequence_ [readToQ, processQ]
pure $ Canceler $ const $ liftEffect do
removeDataListener
removeEndListener
removeErrorListener
-- | Reads a parsed record from the stream.
-- |
@ -188,10 +146,8 @@ read stream = runMaybeT do
-- | Collect all parsed records into an array
readAll
:: forall @r rl a m p
. Alternative p
=> Parallel p m
=> MonadRec m
:: forall @r rl a m
. MonadUnliftAff m
=> MonadAff m
=> RowToList r rl
=> ReadCSVRecord r rl
@ -203,7 +159,7 @@ readAll stream = do
liftEffect $ liftST $ Array.ST.unsafeFreeze records
-- | `data` event. Emitted when a CSV record has been parsed.
dataH :: forall r a. EventHandle1 (CSVParser r a) { | r }
dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String)
dataH = EventHandle "data" mkEffectFn1
-- | FFI