fix: bump node-stream-pipes to 2.0.2 (big perf improvement)

This commit is contained in:
orion 2024-06-22 19:37:18 -05:00
parent 1aab2e1dd1
commit bf9fa5e1e9
Signed by: orion
GPG Key ID: 6D4165AE4C928719
6 changed files with 49 additions and 31 deletions

View File

@ -19,12 +19,13 @@ workspace:
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=1.6.0 <2.0.0" - node-stream-pipes: ">=2.0.2 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- simple-json: ">=9.0.0 <10.0.0" - simple-json: ">=9.0.0 <10.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
@ -632,29 +633,36 @@ packages:
- effect - effect
node-stream-pipes: node-stream-pipes:
type: registry type: registry
version: 1.6.0 version: 2.0.2
integrity: sha256-aYwtrkJgTzLEaYZNel2HLISaWecyBzyKoGVpZpIRTJI= integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI=
dependencies: dependencies:
- aff - aff
- arrays - arrays
- console
- control
- datetime
- effect - effect
- either - either
- exceptions - exceptions
- foldable-traversable - foldable-traversable
- foreign-object - foreign-object
- fork
- lists - lists
- maybe - maybe
- mmorph - mmorph
- newtype
- node-buffer - node-buffer
- node-event-emitter - node-event-emitter
- node-fs - node-fs
- node-path - node-path
- node-streams - node-streams
- node-zlib - node-zlib
- now
- ordered-collections - ordered-collections
- parallel - parallel
- pipes - pipes
- prelude - prelude
- profunctor
- st - st
- strings - strings
- tailrec - tailrec

View File

@ -10,6 +10,8 @@ package:
strict: true strict: true
pedanticPackages: true pedanticPackages: true
dependencies: dependencies:
- pipes
- profunctor
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- arrays: ">=7.3.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0"
- bifunctors: ">=6.0.0 <7.0.0" - bifunctors: ">=6.0.0 <7.0.0"
@ -26,11 +28,10 @@ package:
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-stream-pipes: ">=1.6.0 <2.0.0" - node-stream-pipes: ">=2.0.2 <3.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- nullable: ">=6.0.0 <7.0.0" - nullable: ">=6.0.0 <7.0.0"
- ordered-collections: ">=3.2.0 <4.0.0" - ordered-collections: ">=3.2.0 <4.0.0"
- pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- record: ">=4.0.0 <5.0.0" - record: ">=4.0.0 <5.0.0"
- simple-json: ">=9.0.0 <10.0.0" - simple-json: ">=9.0.0 <10.0.0"

View File

@ -1,7 +1,7 @@
import { DecoderStream } from "cbor-x"; import { DecoderStream } from "cbor-x";
/** @type {(s: import('cbor-x').Options) => () => DecoderStream} */ /** @type {(s: import('cbor-x').Options) => () => DecoderStream} */
export const makeImpl = (c) => () => new DecoderStream({useRecords: false, ...c}); export const makeImpl = (c) => () => new DecoderStream({useRecords: false, ...c, allowHalfOpen: true});
/** @type {(s: DecoderStream) => () => unknown | null} */ /** @type {(s: DecoderStream) => () => unknown | null} */
export const readImpl = (p) => () => p.read(); export const readImpl = (p) => () => p.read();

View File

@ -1,7 +1,7 @@
import { EncoderStream } from "cbor-x"; import { EncoderStream } from "cbor-x";
/** @type {(s: import('cbor-x').Options) => () => EncoderStream} */ /** @type {(s: import('cbor-x').Options) => () => EncoderStream} */
export const makeImpl = (c) => () => new EncoderStream({useRecords: false, ...c}); export const makeImpl = (c) => () => new EncoderStream({useRecords: false, ...c, allowHalfOpen: true});
/** @type {(s: EncoderStream) => (a: unknown) => () => void} */ /** @type {(s: EncoderStream) => (a: unknown) => () => void} */
export const writeImpl = (s) => (a) => () => s.write(a); export const writeImpl = (s) => (a) => () => s.write(a);

View File

@ -4,19 +4,24 @@ import Prelude
import Control.Monad.Error.Class (class MonadThrow, liftEither) import Control.Monad.Error.Class (class MonadThrow, liftEither)
import Control.Monad.Except (runExcept) import Control.Monad.Except (runExcept)
import Control.Monad.Rec.Class (class MonadRec, forever) import Control.Monad.Rec.Class (class MonadRec)
import Data.Bifunctor (lmap) import Data.Bifunctor (lmap)
import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR) import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR)
import Data.Either (Either)
import Data.Maybe (Maybe) import Data.Maybe (Maybe)
import Data.Profunctor as Pro
import Data.Traversable (traverse)
import Effect.Aff.Class (class MonadAff) import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error, error) import Effect.Exception (Error, error)
import Foreign (Foreign, MultipleErrors)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Stream.CBOR.Decode as CBOR.Decode import Node.Stream.CBOR.Decode as CBOR.Decode
import Node.Stream.CBOR.Encode as CBOR.Encode import Node.Stream.CBOR.Encode as CBOR.Encode
import Pipes (await, yield, (>->)) import Pipes.Async (AsyncPipe)
import Pipes.Core (Pipe) import Pipes.Core (Pipe)
import Pipes.Node.Stream (TransformContext)
import Pipes.Node.Stream as Pipes.Stream import Pipes.Node.Stream as Pipes.Stream
import Pipes.Prelude as Pipe
-- | Transforms buffer chunks of a CBOR file to parsed values -- | Transforms buffer chunks of a CBOR file to parsed values
-- | of type `a`. -- | of type `a`.
@ -26,15 +31,14 @@ decode
=> MonadAff m => MonadAff m
=> MonadThrow Error m => MonadThrow Error m
=> ReadCBOR a => ReadCBOR a
=> Pipe (Maybe Buffer) (Maybe a) m Unit => AsyncPipe (TransformContext Buffer Foreign) m (Maybe Buffer) (Maybe (Either MultipleErrors a))
decode = do decode = do
raw <- liftEffect $ CBOR.Decode.make {}
let let
unmarshal = forever do parser = Pipes.Stream.fromTransform $ CBOR.Decode.toObjectStream <$> CBOR.Decode.make {}
r <- await Pro.rmap (map (runExcept <<< readCBOR @a)) parser
yield =<< liftEither (lmap (error <<< show) $ runExcept $ readCBOR @a r)
parser = Pipes.Stream.fromTransform $ CBOR.Decode.toObjectStream raw decodeError :: forall m a r. MonadThrow Error m => Pipe (Maybe (Either MultipleErrors a)) (Maybe a) m r
parser >-> Pipes.Stream.inEOS unmarshal decodeError = Pipe.mapM (traverse liftEither <<< map (lmap $ error <<< show))
-- | Encode purescript values as CBOR buffers -- | Encode purescript values as CBOR buffers
encode encode
@ -43,10 +47,9 @@ encode
=> MonadThrow Error m => MonadThrow Error m
=> MonadRec m => MonadRec m
=> WriteCBOR a => WriteCBOR a
=> Pipe (Maybe a) (Maybe Buffer) m Unit => AsyncPipe (TransformContext Foreign Buffer) m (Maybe a) (Maybe Buffer)
encode = do encode =
raw <- liftEffect $ CBOR.Encode.make {}
let let
printer = Pipes.Stream.fromTransform $ CBOR.Encode.toObjectStream raw p = Pipes.Stream.fromTransform $ CBOR.Encode.toObjectStream <$> CBOR.Encode.make {}
marshal = forever $ yield =<< (writeCBOR <$> await) in
Pipes.Stream.inEOS marshal >-> printer Pro.lcmap (map writeCBOR) p

View File

@ -3,28 +3,32 @@ module Test.Pipes.CBOR where
import Prelude import Prelude
import Control.Monad.Cont (lift) import Control.Monad.Cont (lift)
import Control.Monad.Error.Class (liftEither)
import Control.Monad.Gen (chooseInt) import Control.Monad.Gen (chooseInt)
import Data.Array as Array import Data.Bifunctor (lmap)
import Data.DateTime (DateTime) import Data.DateTime (DateTime)
import Data.List ((:)) import Data.List ((:))
import Data.List as List import Data.List as List
import Data.Maybe (Maybe(..), fromJust) import Data.Maybe (Maybe(..), fromJust)
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy) import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy)
import Data.Traversable (traverse)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (delay) import Effect.Aff (delay)
import Effect.CBOR as CBOR import Effect.CBOR as CBOR
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Buffer as Buffer import Node.Buffer as Buffer
import Node.Encoding (Encoding(..)) import Node.Encoding (Encoding(..))
import Partial.Unsafe (unsafePartial) import Partial.Unsafe (unsafePartial)
import Pipes (yield, (>->)) import Pipes (yield, (>->))
import Pipes.Async (debug, (>-/->))
import Pipes.CBOR as Pipes.CBOR import Pipes.CBOR as Pipes.CBOR
import Pipes.Collect as Pipes.Collect import Pipes.Collect as Pipes.Collect
import Pipes.Node.Stream as Pipes.Stream import Pipes.Node.Stream as Pipes.Stream
import Pipes.Prelude (toListM) as Pipes import Pipes.Prelude (mapM, toListM) as Pipes
import Test.QuickCheck.Gen (randomSample') import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, before, describe, it) import Test.Spec (Spec, before, describe, it)
import Test.Spec.Assertions (shouldEqual) import Test.Spec.Assertions (shouldEqual)
@ -52,7 +56,7 @@ spec =
bytes bytes
<- Pipes.Collect.toBuffer <- Pipes.Collect.toBuffer
$ Pipes.Stream.withEOS (yield exp) $ Pipes.Stream.withEOS (yield exp)
>-> Pipes.CBOR.encode >-/-> Pipes.CBOR.encode
>-> Pipes.Stream.unEOS >-> Pipes.Stream.unEOS
act <- liftEffect $ CBOR.decode bytes act <- liftEffect $ CBOR.decode bytes
act `shouldEqual` exp act `shouldEqual` exp
@ -61,8 +65,9 @@ spec =
it "parses csv" do it "parses csv" do
buf <- liftEffect $ cborBuf buf <- liftEffect $ cborBuf
rows <- Pipes.toListM rows <- Pipes.toListM
$ Pipes.Stream.withEOS (yield buf *> lift (delay $ wrap 10.0)) $ (yield (Just buf) *> yield Nothing)
>-> Pipes.CBOR.decode >-/-> debug "cbor" Pipes.CBOR.decode
>-> Pipes.CBOR.decodeError
rows `shouldEqual` ((Just exp) : Nothing : List.Nil) rows `shouldEqual` ((Just exp) : Nothing : List.Nil)
before before
@ -73,7 +78,7 @@ spec =
bytes <- bytes <-
Pipes.Collect.toBuffer Pipes.Collect.toBuffer
$ Pipes.Stream.withEOS (yield objs) $ Pipes.Stream.withEOS (yield objs)
>-> Pipes.CBOR.encode >-/-> Pipes.CBOR.encode
>-> Pipes.Stream.unEOS >-> Pipes.Stream.unEOS
pure $ nums /\ bytes pure $ nums /\ bytes
) )
@ -81,7 +86,8 @@ spec =
rows <- rows <-
Pipes.Collect.toArray Pipes.Collect.toArray
$ Pipes.Stream.withEOS (yield bytes) $ Pipes.Stream.withEOS (yield bytes)
>-> Pipes.CBOR.decode @(Array {id :: Int}) >-/-> Pipes.CBOR.decode @(Array {id :: Int})
>-> Pipes.CBOR.decodeError
>-> Pipes.Stream.unEOS >-> Pipes.Stream.unEOS
rows `shouldEqual` [(\id -> { id }) <$> nums] rows `shouldEqual` [(\id -> { id }) <$> nums]