diff --git a/spago.lock b/spago.lock index 6880388..169ebe3 100644 --- a/spago.lock +++ b/spago.lock @@ -18,19 +18,18 @@ workspace: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=1.6.0 <2.0.0" + - node-stream-pipes: ">=2.0.2 <3.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - - pipes: ">=8.0.0 <9.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" + - profunctor - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" - tuples: ">=7.0.0 <8.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0" @@ -624,29 +623,36 @@ packages: - unsafe-coerce node-stream-pipes: type: registry - version: 1.6.0 - integrity: sha256-aYwtrkJgTzLEaYZNel2HLISaWecyBzyKoGVpZpIRTJI= + version: 2.0.2 + integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI= dependencies: - aff - arrays + - console + - control + - datetime - effect - either - exceptions - foldable-traversable - foreign-object + - fork - lists - maybe - mmorph + - newtype - node-buffer - node-event-emitter - node-fs - node-path - node-streams - node-zlib + - now - ordered-collections - parallel - pipes - prelude + - profunctor - st - strings - tailrec diff --git a/spago.yaml b/spago.yaml index 7c51bb0..b27d20b 100644 --- a/spago.yaml +++ b/spago.yaml @@ -10,7 +10,7 @@ package: strict: true pedanticPackages: true dependencies: - - node-stream-pipes: ">=1.6.0 <2.0.0" + - profunctor - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" @@ -26,18 +26,17 @@ package: - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" + - node-stream-pipes: ">=2.0.2 <3.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - numbers: ">=9.0.1 <10.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - - pipes: ">=8.0.0 <9.0.0" - precise-datetime: ">=7.0.0 <8.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" - record-extra: ">=5.0.1 <6.0.0" - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" - tuples: ">=7.0.0 <8.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0" diff --git a/src/Pipes.CSV.purs b/src/Pipes.CSV.purs index 9ac747c..b1345e7 100644 --- a/src/Pipes.CSV.purs +++ b/src/Pipes.CSV.purs @@ -4,15 +4,18 @@ import Prelude import Control.Monad.Error.Class (liftEither) import Control.Monad.Except (runExcept) -import Control.Monad.Rec.Class (forever) +import Control.Monad.ST.Global (Global) import Control.Monad.ST.Global as ST +import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref as STRef import Data.Array as Array import Data.Bifunctor (lmap) import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord) import Data.FunctorWithIndex (mapWithIndex) +import Data.Map (Map) import Data.Map as Map import Data.Maybe (Maybe(..)) +import Data.Profunctor as Pro import Data.Tuple.Nested ((/\)) import Effect.Aff (Aff) import Effect.Class (liftEffect) @@ -20,79 +23,80 @@ import Effect.Exception (error) import Node.Buffer (Buffer) import Node.Stream.CSV.Parse as CSV.Parse import Node.Stream.CSV.Stringify as CSV.Stringify -import Pipes (await, yield, (>->)) -import Pipes.Core (Pipe) +import Pipes.Async (AsyncPipe(..), ReadResult(..)) +import Pipes.Node.Stream (TransformContext) import Pipes.Node.Stream as Pipes.Stream import Prim.RowList (class RowToList) import Record.Extra (class Keys, keys) import Type.Prelude (Proxy(..)) +type ParseContext = + { colsST :: STRef Global (Maybe (Map String Int)) + , t :: TransformContext Buffer (Array String) + } + -- | Transforms buffer chunks of a CSV file to parsed -- | records of `r`. --- | --- | ``` --- | -- == my-data.csv.gz == --- | -- id,foo,is_deleted --- | -- 1,hi,f --- | -- 2,bye,t --- | --- | rows --- | :: Array {id :: Int, foo :: String, is_deleted :: Boolean} --- | <- map Array.fromFoldable --- | $ Pipes.toListM --- | $ Pipes.Node.Stream.unEOS --- | $ Pipes.Node.FS.read "my-data.csv.gz" --- | >-> Pipes.Node.Zlib.gunzip --- | >-> Pipes.CSV.parse --- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}] --- | ``` parse :: forall @r rl . RowToList r rl => ReadCSVRecord r rl - => Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit -parse = do - raw <- liftEffect $ CSV.Parse.make {} - colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing - + => AsyncPipe ParseContext Aff (Maybe Buffer) (Maybe { | r }) +parse = let - readCols = liftEffect $ ST.toEffect $ STRef.read colsST - putCols a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) colsST + readCols st = liftEffect $ ST.toEffect $ STRef.read st + putCols st a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) st - parse' a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a - firstRow a = putCols $ Map.fromFoldable $ mapWithIndex (flip (/\)) a - row a cols' = yield =<< parse' a cols' - unmarshal = forever do - r <- await - cols <- readCols - case cols of - Just cols' -> row r cols' - Nothing -> firstRow r + firstRow st a = putCols st $ Map.fromFoldable $ mapWithIndex (flip (/\)) a + row a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a - parser = Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream raw - parser >-> Pipes.Stream.inEOS unmarshal + (AsyncPipe init' write' awaitWrite' read' awaitRead') = + Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} + + init = do + t <- init' + colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing + pure {t, colsST} + + write {t} a = write' t a + awaitWrite {t} = awaitWrite' t + awaitRead {t} = awaitRead' t + + read {colsST, t} = do + r <- read' t + case r of + ReadWouldBlock -> pure ReadWouldBlock + ReadOk Nothing -> pure $ ReadOk Nothing + ReadOk (Just r') -> do + cols <- readCols colsST + case cols of + Just cols' -> ReadOk <$> Just <$> row r' cols' + Nothing -> firstRow colsST r' *> read {colsST, t} + in + AsyncPipe + init + write + awaitWrite + read + awaitRead -- | Transforms buffer chunks of a CSV file to parsed -- | arrays of CSV values. -parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit +parseRaw :: AsyncPipe (TransformContext Buffer (Array String)) Aff (Maybe Buffer) (Maybe (Array String)) parseRaw = do - s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} - Pipes.Stream.fromTransform s + Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} -- | Transforms CSV rows into stringified CSV records -- | using the given ordered array of column names. -stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit -stringifyRaw columns = do - s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {} - Pipes.Stream.fromTransform s +stringifyRaw :: Array String -> AsyncPipe (TransformContext (Array String) String) Aff (Maybe (Array String)) (Maybe String) +stringifyRaw columns = + Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {} -- | Transforms purescript records into stringified CSV records. -- | -- | Columns are inferred from the record's keys, ordered alphabetically. -stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit +stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (TransformContext (Array String) String) Aff (Maybe { | r }) (Maybe String) stringify = do - raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {} let - printer = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream raw - marshal = forever $ yield =<< (writeCSVRecord @r @rl <$> await) - Pipes.Stream.inEOS marshal >-> printer + p = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {} + Pro.lcmap (map $ writeCSVRecord @r @rl) p diff --git a/test/Test/Pipes.CSV.purs b/test/Test/Pipes.CSV.purs index d17353a..4ad9d8d 100644 --- a/test/Test/Pipes.CSV.purs +++ b/test/Test/Pipes.CSV.purs @@ -17,6 +17,7 @@ import Effect.Console (log) import Node.Encoding (Encoding(..)) import Partial.Unsafe (unsafePartial) import Pipes (yield, (>->)) +import Pipes.Async ((>-/->)) import Pipes.CSV as Pipes.CSV import Pipes.Collect as Pipes.Collect import Pipes.Construct as Pipes.Construct @@ -50,7 +51,7 @@ spec = , { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" } ] - csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS + csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-/-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS csv' `shouldEqual` csv describe "parse" do it "parses csv" do @@ -58,7 +59,7 @@ spec = $ Pipes.toListM $ Pipes.Stream.withEOS (yield csv) >-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8) - >-> Pipes.CSV.parse + >-/-> Pipes.CSV.parse >-> Pipes.Stream.unEOS rows `shouldEqual` @@ -82,7 +83,7 @@ spec = rows <- Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs) - >-> Pipes.CSV.parse @(id :: Int) + >-/-> Pipes.CSV.parse @(id :: Int) >-> Pipes.Stream.unEOS rows `shouldEqual` ((\id -> { id }) <$> nums)