fix: bump node-stream-pipes to 2.0.2 (big perf improvement)

This commit is contained in:
orion 2024-06-22 19:57:56 -05:00
parent 9c86da438c
commit 52af3cadae
Signed by: orion
GPG Key ID: 6D4165AE4C928719
4 changed files with 71 additions and 61 deletions

View File

@ -18,19 +18,18 @@ workspace:
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=1.6.0 <2.0.0" - node-stream-pipes: ">=2.0.2 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0" - record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0" - tuples: ">=7.0.0 <8.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0"
@ -624,29 +623,36 @@ packages:
- unsafe-coerce - unsafe-coerce
node-stream-pipes: node-stream-pipes:
type: registry type: registry
version: 1.6.0 version: 2.0.2
integrity: sha256-aYwtrkJgTzLEaYZNel2HLISaWecyBzyKoGVpZpIRTJI= integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI=
dependencies: dependencies:
- aff - aff
- arrays - arrays
- console
- control
- datetime
- effect - effect
- either - either
- exceptions - exceptions
- foldable-traversable - foldable-traversable
- foreign-object - foreign-object
- fork
- lists - lists
- maybe - maybe
- mmorph - mmorph
- newtype
- node-buffer - node-buffer
- node-event-emitter - node-event-emitter
- node-fs - node-fs
- node-path - node-path
- node-streams - node-streams
- node-zlib - node-zlib
- now
- ordered-collections - ordered-collections
- parallel - parallel
- pipes - pipes
- prelude - prelude
- profunctor
- st - st
- strings - strings
- tailrec - tailrec

View File

@ -10,7 +10,7 @@ package:
strict: true strict: true
pedanticPackages: true pedanticPackages: true
dependencies: dependencies:
- node-stream-pipes: ">=1.6.0 <2.0.0" - profunctor
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
@ -26,18 +26,17 @@ package:
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=2.0.2 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- numbers: ">=9.0.1 <10.0.0" - numbers: ">=9.0.1 <10.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- precise-datetime: ">=7.0.0 <8.0.0" - precise-datetime: ">=7.0.0 <8.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- record-extra: ">=5.0.1 <6.0.0" - record-extra: ">=5.0.1 <6.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0" - tuples: ">=7.0.0 <8.0.0"
- typelevel-prelude: ">=7.0.0 <8.0.0" - typelevel-prelude: ">=7.0.0 <8.0.0"

View File

@ -4,15 +4,18 @@ import Prelude
import Control.Monad.Error.Class (liftEither) import Control.Monad.Error.Class (liftEither)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Rec.Class (forever) import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Global as ST import Control.Monad.ST.Global as ST
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Data.Array as Array import Data.Array as Array
import Data.Bifunctor (lmap) import Data.Bifunctor (lmap)
import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord) import Data.CSV.Record (class ReadCSVRecord, class WriteCSVRecord, readCSVRecord, writeCSVRecord)
import Data.FunctorWithIndex (mapWithIndex) import Data.FunctorWithIndex (mapWithIndex)
import Data.Map (Map)
import Data.Map as Map import Data.Map as Map
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..))
import Data.Profunctor as Pro
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff) import Effect.Aff (Aff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
@ -20,79 +23,80 @@ import Effect.Exception (error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Stream.CSV.Parse as CSV.Parse import Node.Stream.CSV.Parse as CSV.Parse
import Node.Stream.CSV.Stringify as CSV.Stringify import Node.Stream.CSV.Stringify as CSV.Stringify
import Pipes (await, yield, (>->)) import Pipes.Async (AsyncPipe(..), ReadResult(..))
import Pipes.Core (Pipe) import Pipes.Node.Stream (TransformContext)
import Pipes.Node.Stream as Pipes.Stream import Pipes.Node.Stream as Pipes.Stream
import Prim.RowList (class RowToList) import Prim.RowList (class RowToList)
import Record.Extra (class Keys, keys) import Record.Extra (class Keys, keys)
import Type.Prelude (Proxy(..)) import Type.Prelude (Proxy(..))
type ParseContext =
{ colsST :: STRef Global (Maybe (Map String Int))
, t :: TransformContext Buffer (Array String)
}
-- | Transforms buffer chunks of a CSV file to parsed -- | Transforms buffer chunks of a CSV file to parsed
-- | records of `r`. -- | records of `r`.
-- |
-- | ```
-- | -- == my-data.csv.gz ==
-- | -- id,foo,is_deleted
-- | -- 1,hi,f
-- | -- 2,bye,t
-- |
-- | rows
-- | :: Array {id :: Int, foo :: String, is_deleted :: Boolean}
-- | <- map Array.fromFoldable
-- | $ Pipes.toListM
-- | $ Pipes.Node.Stream.unEOS
-- | $ Pipes.Node.FS.read "my-data.csv.gz"
-- | >-> Pipes.Node.Zlib.gunzip
-- | >-> Pipes.CSV.parse
-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}]
-- | ```
parse parse
:: forall @r rl :: forall @r rl
. RowToList r rl . RowToList r rl
=> ReadCSVRecord r rl => ReadCSVRecord r rl
=> Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit => AsyncPipe ParseContext Aff (Maybe Buffer) (Maybe { | r })
parse = do parse =
raw <- liftEffect $ CSV.Parse.make {}
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
let let
readCols = liftEffect $ ST.toEffect $ STRef.read colsST readCols st = liftEffect $ ST.toEffect $ STRef.read st
putCols a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) colsST putCols st a = void $ liftEffect $ ST.toEffect $ STRef.write (Just a) st
parse' a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a firstRow st a = putCols st $ Map.fromFoldable $ mapWithIndex (flip (/\)) a
firstRow a = putCols $ Map.fromFoldable $ mapWithIndex (flip (/\)) a row a cols' = liftEither $ lmap (error <<< show) $ runExcept $ readCSVRecord @r @rl cols' a
row a cols' = yield =<< parse' a cols'
unmarshal = forever do
r <- await
cols <- readCols
case cols of
Just cols' -> row r cols'
Nothing -> firstRow r
parser = Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream raw (AsyncPipe init' write' awaitWrite' read' awaitRead') =
parser >-> Pipes.Stream.inEOS unmarshal Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
init = do
t <- init'
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
pure {t, colsST}
write {t} a = write' t a
awaitWrite {t} = awaitWrite' t
awaitRead {t} = awaitRead' t
read {colsST, t} = do
r <- read' t
case r of
ReadWouldBlock -> pure ReadWouldBlock
ReadOk Nothing -> pure $ ReadOk Nothing
ReadOk (Just r') -> do
cols <- readCols colsST
case cols of
Just cols' -> ReadOk <$> Just <$> row r' cols'
Nothing -> firstRow colsST r' *> read {colsST, t}
in
AsyncPipe
init
write
awaitWrite
read
awaitRead
-- | Transforms buffer chunks of a CSV file to parsed -- | Transforms buffer chunks of a CSV file to parsed
-- | arrays of CSV values. -- | arrays of CSV values.
parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit parseRaw :: AsyncPipe (TransformContext Buffer (Array String)) Aff (Maybe Buffer) (Maybe (Array String))
parseRaw = do parseRaw = do
s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {} Pipes.Stream.fromTransform $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
Pipes.Stream.fromTransform s
-- | Transforms CSV rows into stringified CSV records -- | Transforms CSV rows into stringified CSV records
-- | using the given ordered array of column names. -- | using the given ordered array of column names.
stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit stringifyRaw :: Array String -> AsyncPipe (TransformContext (Array String) String) Aff (Maybe (Array String)) (Maybe String)
stringifyRaw columns = do stringifyRaw columns =
s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {} Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
Pipes.Stream.fromTransform s
-- | Transforms purescript records into stringified CSV records. -- | Transforms purescript records into stringified CSV records.
-- | -- |
-- | Columns are inferred from the record's keys, ordered alphabetically. -- | Columns are inferred from the record's keys, ordered alphabetically.
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => AsyncPipe (TransformContext (Array String) String) Aff (Maybe { | r }) (Maybe String)
stringify = do stringify = do
raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
let let
printer = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream raw p = Pipes.Stream.fromTransform $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
marshal = forever $ yield =<< (writeCSVRecord @r @rl <$> await) Pro.lcmap (map $ writeCSVRecord @r @rl) p
Pipes.Stream.inEOS marshal >-> printer

View File

@ -17,6 +17,7 @@ import Effect.Console (log)
import Node.Encoding (Encoding(..)) import Node.Encoding (Encoding(..))
import Partial.Unsafe (unsafePartial) import Partial.Unsafe (unsafePartial)
import Pipes (yield, (>->)) import Pipes (yield, (>->))
import Pipes.Async ((>-/->))
import Pipes.CSV as Pipes.CSV import Pipes.CSV as Pipes.CSV
import Pipes.Collect as Pipes.Collect import Pipes.Collect as Pipes.Collect
import Pipes.Construct as Pipes.Construct import Pipes.Construct as Pipes.Construct
@ -50,7 +51,7 @@ spec =
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" } , { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
] ]
csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS csv' <- map fold $ Pipes.Collect.toArray $ Pipes.Stream.withEOS (Pipes.Construct.eachArray objs) >-/-> Pipes.CSV.stringify >-> Pipes.Stream.unEOS
csv' `shouldEqual` csv csv' `shouldEqual` csv
describe "parse" do describe "parse" do
it "parses csv" do it "parses csv" do
@ -58,7 +59,7 @@ spec =
$ Pipes.toListM $ Pipes.toListM
$ Pipes.Stream.withEOS (yield csv) $ Pipes.Stream.withEOS (yield csv)
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8) >-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
>-> Pipes.CSV.parse >-/-> Pipes.CSV.parse
>-> Pipes.Stream.unEOS >-> Pipes.Stream.unEOS
rows `shouldEqual` rows `shouldEqual`
@ -82,7 +83,7 @@ spec =
rows <- rows <-
Pipes.Collect.toArray Pipes.Collect.toArray
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs) $ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs)
>-> Pipes.CSV.parse @(id :: Int) >-/-> Pipes.CSV.parse @(id :: Int)
>-> Pipes.Stream.unEOS >-> Pipes.Stream.unEOS
rows `shouldEqual` ((\id -> { id }) <$> nums) rows `shouldEqual` ((\id -> { id }) <$> nums)