diff --git a/spago.lock b/spago.lock index 384591c..66b773e 100644 --- a/spago.lock +++ b/spago.lock @@ -25,7 +25,6 @@ workspace: - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - - parallel: ">=6.0.0 <7.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" @@ -35,6 +34,7 @@ workspace: - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0" + - unlift: ">=1.0.1 <2.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0" test_dependencies: - console @@ -43,6 +43,7 @@ workspace: - arraybuffer-types - arrays - bifunctors + - catenable-lists - console - const - contravariant @@ -61,6 +62,8 @@ workspace: - foreign - foreign-object - formatters + - free + - freet - functions - functors - gen @@ -71,6 +74,7 @@ workspace: - lazy - lists - maybe + - monad-control - newtype - node-buffer - node-event-emitter @@ -100,6 +104,7 @@ workspace: - typelevel-prelude - unfoldable - unicode + - unlift - unsafe-coerce extra_packages: {} packages: @@ -159,6 +164,18 @@ packages: - newtype - prelude - tuples + catenable-lists: + type: registry + version: 7.0.0 + integrity: sha256-76vYENhwF4BWTBsjeLuErCH2jqVT4M3R1HX+4RwSftA= + dependencies: + - control + - foldable-traversable + - lists + - maybe + - prelude + - tuples + - unfoldable console: type: registry version: 6.1.0 @@ -352,6 +369,40 @@ packages: - parsing - prelude - transformers + free: + type: registry + version: 7.1.0 + integrity: sha256-JAumgEsGSzJCNLD8AaFvuX7CpqS5yruCngi6yI7+V5k= + dependencies: + - catenable-lists + - control + - distributive + - either + - exists + - foldable-traversable + - invariant + - lazy + - maybe + - prelude + - tailrec + - transformers + - tuples + - unsafe-coerce + freet: + type: registry + version: 7.0.0 + integrity: sha256-zkL6wU4ZPq8xz1kGFxoliWqyhBksepMJTyA68VEBaJo= + dependencies: + - aff + - bifunctors + - effect + - either + - exists + - free + - prelude + - tailrec + - transformers + - tuples functions: type: registry version: 6.0.0 @@ -461,6 +512,15 @@ packages: - invariant - newtype - prelude + monad-control: + type: registry + version: 5.0.0 + integrity: sha256-bgfDW30wbIm70NR1Tvvh9P+VFQMDh1wK2sSJXCj/dZc= + dependencies: + - aff + - freet + - identity + - lists newtype: type: registry version: 5.0.0 @@ -784,6 +844,23 @@ packages: - foldable-traversable - maybe - strings + unlift: + type: registry + version: 1.0.1 + integrity: sha256-nbBCVV0fZz/3UHKoW11dcTwBYmQOIgK31ht2BN47RPw= + dependencies: + - aff + - effect + - either + - freet + - identity + - lists + - maybe + - monad-control + - prelude + - st + - transformers + - tuples unsafe-coerce: type: registry version: 6.0.0 diff --git a/spago.yaml b/spago.yaml index dcf38d9..13711ee 100644 --- a/spago.yaml +++ b/spago.yaml @@ -32,7 +32,6 @@ package: - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - - parallel: ">=6.0.0 <7.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" @@ -42,6 +41,7 @@ package: - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0" + - unlift: ">=1.0.1 <2.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0" test: main: Test.Main diff --git a/src/Node.Stream.CSV.Parse.purs b/src/Node.Stream.CSV.Parse.purs index ffecbf3..1e0fee8 100644 --- a/src/Node.Stream.CSV.Parse.purs +++ b/src/Node.Stream.CSV.Parse.purs @@ -3,15 +3,12 @@ module Node.Stream.CSV.Parse where import Prelude hiding (join) import Control.Alt ((<|>)) -import Control.Alternative (guard, empty) -import Control.Monad.Error.Class (liftEither) +import Control.Monad.Error.Class (liftEither, liftMaybe) import Control.Monad.Except (runExcept) +import Control.Monad.Except.Trans (catchError) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) -import Control.Monad.Rec.Class (class MonadRec, untilJust, whileJust) import Control.Monad.ST.Class (liftST) import Control.Monad.Trans.Class (lift) -import Control.MonadPlus (class Alternative) -import Control.Parallel (class Parallel, parSequence_) import Data.Array as Array import Data.Array.ST as Array.ST import Data.Bifunctor (lmap) @@ -20,15 +17,13 @@ import Data.Either (Either(..)) import Data.Filterable (filter) import Data.Map (Map) import Data.Map as Map -import Data.Maybe (Maybe(..), isNothing) -import Data.Newtype (wrap) +import Data.Maybe (Maybe(..)) import Data.Nullable (Nullable) import Data.Nullable as Nullable -import Data.Traversable (for) import Effect (Effect) -import Effect as Effect -import Effect.Aff (Canceler(..), delay, makeAff) +import Effect.Aff (Canceler(..), launchAff_, makeAff) import Effect.Aff.Class (class MonadAff, liftAff) +import Effect.Aff.Unlift (class MonadUnliftAff, UnliftAff(..), askUnliftAff) import Effect.Class (liftEffect) import Effect.Exception (error) import Effect.Uncurried (mkEffectFn1) @@ -93,11 +88,9 @@ make = makeImpl <<< unsafeToForeign <<< Object.union (recordToForeign { columns: -- | Synchronously parse a CSV string parse - :: forall @r rl @config missing extra m p - . Alternative p - => Parallel p m + :: forall @r rl @config missing extra m + . MonadUnliftAff m => MonadAff m - => MonadRec m => RowToList r rl => ReadCSVRecord r rl => Union config missing (Config extra) @@ -112,64 +105,29 @@ parse config csv = do -- | Loop until the stream is closed, invoking the callback with each record as it is parsed. foreach - :: forall @r rl x m p - . Alternative p - => Parallel p m - => MonadRec m + :: forall @r rl x m + . MonadUnliftAff m => MonadAff m => RowToList r rl => ReadCSVRecord r rl => CSVParser r x -> ({ | r } -> m Unit) -> m Unit -foreach stream cb = - do - q <- liftEffect $ liftST $ Array.ST.new +foreach stream cb = do + UnliftAff unlift <- askUnliftAff + liftAff $ makeAff \res -> do + removeDataListener <- flip (Event.on dataH) stream \row -> do + cols <- liftMaybe (error "did not read header column") =<< getOrInitColumnsMap stream + record <- liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols row + launchAff_ $ flip catchError (liftEffect <<< res <<< Left) (unlift $ cb record) + removeEndListener <- flip (Event.once Stream.endH) stream (res $ Right unit) + removeErrorListener <- flip (Event.on Stream.errorH) stream (res <<< Left) - let - deque = liftEffect $ liftST $ Array.ST.shift q - enque a = liftEffect $ liftST $ Array.ST.push a q + pure $ Canceler $ const $ liftEffect do + removeDataListener + removeEndListener + removeErrorListener - waitReadable = - makeAff \res -> do - stop <- flip (Event.once Stream.readableH) stream $ res $ Right unit - pure $ Canceler $ const $ liftEffect stop - - processQ = - untilJust - $ runMaybeT - $ do - liftAff $ delay $ wrap 0.0 - r <- deque - isClosed <- liftEffect $ Stream.closed stream - if isClosed && isNothing r then - pure unit - else if isNothing r then do - liftAff $ delay $ wrap 10.0 - empty - else do - r' <- MaybeT $ pure r - lift $ cb r' - empty - - readToQ = - whileJust - $ runMaybeT - $ do - liftAff $ delay $ wrap 0.0 - guard =<< not <$> liftEffect (Stream.closed stream) - isReadable <- liftEffect $ Stream.readable stream - liftAff $ when (not isReadable) waitReadable - - liftEffect $ Effect.untilE do - r <- read @r stream - void $ for r enque - pure $ isNothing r - guard =<< not <$> liftEffect (Stream.closed stream) - pure unit - - parSequence_ [readToQ, processQ] - -- | Reads a parsed record from the stream. -- | -- | Returns `Nothing` when either: @@ -188,10 +146,8 @@ read stream = runMaybeT do -- | Collect all parsed records into an array readAll - :: forall @r rl a m p - . Alternative p - => Parallel p m - => MonadRec m + :: forall @r rl a m + . MonadUnliftAff m => MonadAff m => RowToList r rl => ReadCSVRecord r rl @@ -203,7 +159,7 @@ readAll stream = do liftEffect $ liftST $ Array.ST.unsafeFreeze records -- | `data` event. Emitted when a CSV record has been parsed. -dataH :: forall r a. EventHandle1 (CSVParser r a) { | r } +dataH :: forall r a. EventHandle1 (CSVParser r a) (Array String) dataH = EventHandle "data" mkEffectFn1 -- | FFI