fix: improve performance
This commit is contained in:
parent
e87d81cc1d
commit
2b16a38a66
@ -18,7 +18,7 @@ workspace:
|
|||||||
- newtype: ">=5.0.0 <6.0.0"
|
- newtype: ">=5.0.0 <6.0.0"
|
||||||
- node-buffer: ">=9.0.0 <10.0.0"
|
- node-buffer: ">=9.0.0 <10.0.0"
|
||||||
- node-event-emitter: ">=3.0.0 <4.0.0"
|
- node-event-emitter: ">=3.0.0 <4.0.0"
|
||||||
- node-stream-pipes: ">=1.2.3 <2.0.0"
|
- node-stream-pipes: ">=1.3.0 <2.0.0"
|
||||||
- node-streams: ">=9.0.0 <10.0.0"
|
- node-streams: ">=9.0.0 <10.0.0"
|
||||||
- nullable: ">=6.0.0 <7.0.0"
|
- nullable: ">=6.0.0 <7.0.0"
|
||||||
- numbers: ">=9.0.1 <10.0.0"
|
- numbers: ">=9.0.1 <10.0.0"
|
||||||
@ -607,8 +607,8 @@ packages:
|
|||||||
- effect
|
- effect
|
||||||
node-stream-pipes:
|
node-stream-pipes:
|
||||||
type: registry
|
type: registry
|
||||||
version: 1.2.3
|
version: 1.3.0
|
||||||
integrity: sha256-lXD3x6+p72uBrRHGHrob2jrrBDakhhZE9O9EYE4aFiE=
|
integrity: sha256-5Jpf0BLn0ExQWYxbTTewai4M8quEmEVHxihc9CM1Juo=
|
||||||
dependencies:
|
dependencies:
|
||||||
- aff
|
- aff
|
||||||
- arrays
|
- arrays
|
||||||
|
@ -10,6 +10,7 @@ package:
|
|||||||
strict: true
|
strict: true
|
||||||
pedanticPackages: true
|
pedanticPackages: true
|
||||||
dependencies:
|
dependencies:
|
||||||
|
- node-stream-pipes: ">=1.3.0 <2.0.0"
|
||||||
- aff: ">=7.1.0 <8.0.0"
|
- aff: ">=7.1.0 <8.0.0"
|
||||||
- arrays: ">=7.3.0 <8.0.0"
|
- arrays: ">=7.3.0 <8.0.0"
|
||||||
- bifunctors: ">=6.0.0 <7.0.0"
|
- bifunctors: ">=6.0.0 <7.0.0"
|
||||||
@ -25,7 +26,6 @@ package:
|
|||||||
- newtype: ">=5.0.0 <6.0.0"
|
- newtype: ">=5.0.0 <6.0.0"
|
||||||
- node-buffer: ">=9.0.0 <10.0.0"
|
- node-buffer: ">=9.0.0 <10.0.0"
|
||||||
- node-event-emitter: ">=3.0.0 <4.0.0"
|
- node-event-emitter: ">=3.0.0 <4.0.0"
|
||||||
- node-stream-pipes: ">=1.2.3 <2.0.0"
|
|
||||||
- node-streams: ">=9.0.0 <10.0.0"
|
- node-streams: ">=9.0.0 <10.0.0"
|
||||||
- nullable: ">=6.0.0 <7.0.0"
|
- nullable: ">=6.0.0 <7.0.0"
|
||||||
- numbers: ">=9.0.1 <10.0.0"
|
- numbers: ">=9.0.1 <10.0.0"
|
||||||
|
@ -2,9 +2,9 @@ module Pipes.CSV where
|
|||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
import Control.Monad.Error.Class (class MonadThrow, liftEither)
|
import Control.Monad.Error.Class (liftEither)
|
||||||
import Control.Monad.Except (runExcept)
|
import Control.Monad.Except (runExcept)
|
||||||
import Control.Monad.Rec.Class (class MonadRec, forever)
|
import Control.Monad.Rec.Class (forever)
|
||||||
import Control.Monad.ST.Global as ST
|
import Control.Monad.ST.Global as ST
|
||||||
import Control.Monad.ST.Ref as STRef
|
import Control.Monad.ST.Ref as STRef
|
||||||
import Data.Array as Array
|
import Data.Array as Array
|
||||||
@ -14,9 +14,9 @@ import Data.FunctorWithIndex (mapWithIndex)
|
|||||||
import Data.Map as Map
|
import Data.Map as Map
|
||||||
import Data.Maybe (Maybe(..))
|
import Data.Maybe (Maybe(..))
|
||||||
import Data.Tuple.Nested ((/\))
|
import Data.Tuple.Nested ((/\))
|
||||||
import Effect.Aff.Class (class MonadAff)
|
import Effect.Aff (Aff)
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
import Effect.Exception (Error, error)
|
import Effect.Exception (error)
|
||||||
import Node.Buffer (Buffer)
|
import Node.Buffer (Buffer)
|
||||||
import Node.Stream.CSV.Parse as CSV.Parse
|
import Node.Stream.CSV.Parse as CSV.Parse
|
||||||
import Node.Stream.CSV.Stringify as CSV.Stringify
|
import Node.Stream.CSV.Stringify as CSV.Stringify
|
||||||
@ -47,13 +47,10 @@ import Type.Prelude (Proxy(..))
|
|||||||
-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}]
|
-- | rows `shouldEqual` [{id: 1, foo: "hi", is_deleted: false}, {id: 2, foo: "bye", is_deleted: true}]
|
||||||
-- | ```
|
-- | ```
|
||||||
parse
|
parse
|
||||||
:: forall @r rl m
|
:: forall @r rl
|
||||||
. MonadAff m
|
. RowToList r rl
|
||||||
=> MonadThrow Error m
|
|
||||||
=> MonadRec m
|
|
||||||
=> RowToList r rl
|
|
||||||
=> ReadCSVRecord r rl
|
=> ReadCSVRecord r rl
|
||||||
=> Pipe (Maybe Buffer) (Maybe { | r }) m Unit
|
=> Pipe (Maybe Buffer) (Maybe { | r }) Aff Unit
|
||||||
parse = do
|
parse = do
|
||||||
raw <- liftEffect $ CSV.Parse.make {}
|
raw <- liftEffect $ CSV.Parse.make {}
|
||||||
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
|
colsST <- liftEffect $ ST.toEffect $ STRef.new Nothing
|
||||||
@ -77,14 +74,14 @@ parse = do
|
|||||||
|
|
||||||
-- | Transforms buffer chunks of a CSV file to parsed
|
-- | Transforms buffer chunks of a CSV file to parsed
|
||||||
-- | arrays of CSV values.
|
-- | arrays of CSV values.
|
||||||
parseRaw :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe (Array String)) m Unit
|
parseRaw :: Pipe (Maybe Buffer) (Maybe (Array String)) Aff Unit
|
||||||
parseRaw = do
|
parseRaw = do
|
||||||
s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
|
s <- liftEffect $ CSV.Parse.toObjectStream <$> CSV.Parse.make {}
|
||||||
Pipes.Stream.fromTransform s
|
Pipes.Stream.fromTransform s
|
||||||
|
|
||||||
-- | Transforms CSV rows into stringified CSV records
|
-- | Transforms CSV rows into stringified CSV records
|
||||||
-- | using the given ordered array of column names.
|
-- | using the given ordered array of column names.
|
||||||
stringifyRaw :: forall m. MonadAff m => MonadThrow Error m => Array String -> Pipe (Maybe (Array String)) (Maybe String) m Unit
|
stringifyRaw :: Array String -> Pipe (Maybe (Array String)) (Maybe String) Aff Unit
|
||||||
stringifyRaw columns = do
|
stringifyRaw columns = do
|
||||||
s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
|
s <- liftEffect $ CSV.Stringify.toObjectStream <$> CSV.Stringify.make columns {}
|
||||||
Pipes.Stream.fromTransform s
|
Pipes.Stream.fromTransform s
|
||||||
@ -92,7 +89,7 @@ stringifyRaw columns = do
|
|||||||
-- | Transforms purescript records into stringified CSV records.
|
-- | Transforms purescript records into stringified CSV records.
|
||||||
-- |
|
-- |
|
||||||
-- | Columns are inferred from the record's keys, ordered alphabetically.
|
-- | Columns are inferred from the record's keys, ordered alphabetically.
|
||||||
stringify :: forall m r rl. MonadRec m => MonadAff m => MonadThrow Error m => WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) m Unit
|
stringify :: forall r rl. WriteCSVRecord r rl => RowToList r rl => Keys rl => Pipe (Maybe { | r }) (Maybe String) Aff Unit
|
||||||
stringify = do
|
stringify = do
|
||||||
raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
|
raw <- liftEffect $ CSV.Stringify.make (Array.fromFoldable $ keys $ Proxy @r) {}
|
||||||
let
|
let
|
||||||
|
@ -10,5 +10,5 @@ import Test.Spec.Reporter (specReporter)
|
|||||||
import Test.Spec.Runner (defaultConfig, runSpec')
|
import Test.Spec.Runner (defaultConfig, runSpec')
|
||||||
|
|
||||||
main :: Effect Unit
|
main :: Effect Unit
|
||||||
main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ specReporter ] do
|
main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
|
||||||
Test.Pipes.CSV.spec
|
Test.Pipes.CSV.spec
|
||||||
|
@ -6,22 +6,26 @@ import Control.Monad.Gen (chooseInt)
|
|||||||
import Control.Monad.Rec.Class (Step(..), tailRecM)
|
import Control.Monad.Rec.Class (Step(..), tailRecM)
|
||||||
import Data.Array as Array
|
import Data.Array as Array
|
||||||
import Data.DateTime (DateTime)
|
import Data.DateTime (DateTime)
|
||||||
import Data.Foldable (fold)
|
import Data.Foldable (fold, sum)
|
||||||
import Data.Maybe (Maybe(..), fromJust)
|
import Data.Maybe (Maybe(..), fromJust)
|
||||||
import Data.Newtype (wrap)
|
import Data.Newtype (wrap)
|
||||||
import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy)
|
import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy)
|
||||||
|
import Data.String.CodePoints as String.CodePoints
|
||||||
|
import Data.Tuple.Nested ((/\))
|
||||||
import Effect.Class (liftEffect)
|
import Effect.Class (liftEffect)
|
||||||
|
import Effect.Console (log)
|
||||||
import Node.Encoding (Encoding(..))
|
import Node.Encoding (Encoding(..))
|
||||||
import Partial.Unsafe (unsafePartial)
|
import Partial.Unsafe (unsafePartial)
|
||||||
import Pipes (yield, (>->))
|
import Pipes (yield, (>->))
|
||||||
import Pipes.CSV as Pipes.CSV
|
import Pipes.CSV as Pipes.CSV
|
||||||
import Pipes.Collect as Pipes.Collect
|
import Pipes.Collect as Pipes.Collect
|
||||||
|
import Pipes.Construct as Pipes.Construct
|
||||||
import Pipes.Node.Buffer as Pipes.Buffer
|
import Pipes.Node.Buffer as Pipes.Buffer
|
||||||
import Pipes.Node.Stream as Pipes.Stream
|
import Pipes.Node.Stream as Pipes.Stream
|
||||||
import Pipes.Prelude (map, toListM) as Pipes
|
import Pipes.Prelude (chain, map, toListM) as Pipes
|
||||||
import Pipes.Util as Pipes.Util
|
import Pipes.Util as Pipes.Util
|
||||||
import Test.QuickCheck.Gen (randomSample')
|
import Test.QuickCheck.Gen (randomSample')
|
||||||
import Test.Spec (Spec, describe, it)
|
import Test.Spec (Spec, before, describe, it)
|
||||||
import Test.Spec.Assertions (shouldEqual)
|
import Test.Spec.Assertions (shouldEqual)
|
||||||
|
|
||||||
csv :: String
|
csv :: String
|
||||||
@ -62,27 +66,23 @@ spec =
|
|||||||
, { id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z" }
|
, { id: 2, foo: "apple", flag: false, created: dt "2024-02-02T08:00:00Z" }
|
||||||
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
|
, { id: 3, foo: "hello", flag: true, created: dt "1970-01-01T00:00:00Z" }
|
||||||
]
|
]
|
||||||
it "parses large csv" do
|
before
|
||||||
|
(do
|
||||||
nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9)
|
nums <- liftEffect $ randomSample' 100000 (chooseInt 0 9)
|
||||||
let
|
let
|
||||||
csvRows = [ "id\n" ] <> ((_ <> "\n") <$> show <$> nums)
|
chars = [ "i","d","\n" ] <> join ((\n -> [show n, "\n"]) <$> nums)
|
||||||
csv' =
|
bufs <- Pipes.Collect.toArray
|
||||||
let
|
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray chars)
|
||||||
go ix
|
>-> Pipes.Util.chunked 1000
|
||||||
| Just a <- Array.index csvRows ix = yield a $> Loop (ix + 1)
|
>-> Pipes.Stream.inEOS (Pipes.map fold >-> Pipes.Buffer.fromString UTF8)
|
||||||
| otherwise = pure $ Done unit
|
>-> Pipes.Stream.unEOS
|
||||||
in
|
pure $ nums /\ bufs
|
||||||
tailRecM go 0
|
)
|
||||||
in16kbChunks =
|
$ it "parses large csv" \(nums /\ bufs) -> do
|
||||||
Pipes.Util.chunked 16000
|
|
||||||
>-> Pipes.Stream.inEOS (Pipes.map fold)
|
|
||||||
>-> Pipes.Stream.inEOS (Pipes.Buffer.fromString UTF8)
|
|
||||||
|
|
||||||
rows <-
|
rows <-
|
||||||
Pipes.Collect.toArray
|
Pipes.Collect.toArray
|
||||||
$ Pipes.Stream.withEOS csv'
|
$ Pipes.Stream.withEOS (Pipes.Construct.eachArray bufs)
|
||||||
>-> in16kbChunks
|
>-> Pipes.CSV.parse @(id :: Int)
|
||||||
>-> Pipes.CSV.parse
|
|
||||||
>-> Pipes.Stream.unEOS
|
>-> Pipes.Stream.unEOS
|
||||||
|
|
||||||
rows `shouldEqual` ((\id -> { id }) <$> nums)
|
rows `shouldEqual` ((\id -> { id }) <$> nums)
|
||||||
|
Loading…
Reference in New Issue
Block a user