diff --git a/spago.lock b/spago.lock index 4a2d92a..4fdbc49 100644 --- a/spago.lock +++ b/spago.lock @@ -19,12 +19,13 @@ workspace: - maybe: ">=6.0.0 <7.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=1.6.0 <2.0.0" + - node-stream-pipes: ">=2.0.2 <3.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - - pipes: ">=8.0.0 <9.0.0" + - pipes - prelude: ">=6.0.1 <7.0.0" + - profunctor - record: ">=4.0.0 <5.0.0" - simple-json: ">=9.0.0 <10.0.0" - tailrec: ">=6.1.0 <7.0.0" @@ -632,29 +633,36 @@ packages: - effect node-stream-pipes: type: registry - version: 1.6.0 - integrity: sha256-aYwtrkJgTzLEaYZNel2HLISaWecyBzyKoGVpZpIRTJI= + version: 2.0.2 + integrity: sha256-IwkFgzWVwqjZkQRLYBGaRukKqYIw2I7wKHwIXRcdBWI= dependencies: - aff - arrays + - console + - control + - datetime - effect - either - exceptions - foldable-traversable - foreign-object + - fork - lists - maybe - mmorph + - newtype - node-buffer - node-event-emitter - node-fs - node-path - node-streams - node-zlib + - now - ordered-collections - parallel - pipes - prelude + - profunctor - st - strings - tailrec diff --git a/spago.yaml b/spago.yaml index 0852473..de9eef2 100644 --- a/spago.yaml +++ b/spago.yaml @@ -10,6 +10,8 @@ package: strict: true pedanticPackages: true dependencies: + - pipes + - profunctor - aff: ">=7.1.0 <8.0.0" - arrays: ">=7.3.0 <8.0.0" - bifunctors: ">=6.0.0 <7.0.0" @@ -26,11 +28,10 @@ package: - maybe: ">=6.0.0 <7.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - - node-stream-pipes: ">=1.6.0 <2.0.0" + - node-stream-pipes: ">=2.0.2 <3.0.0" - node-streams: ">=9.0.0 <10.0.0" - nullable: ">=6.0.0 <7.0.0" - ordered-collections: ">=3.2.0 <4.0.0" - - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" - record: ">=4.0.0 <5.0.0" - simple-json: ">=9.0.0 <10.0.0" diff --git a/src/Node.Stream.CBOR.Decode.js b/src/Node.Stream.CBOR.Decode.js index ba4d4a9..6de9555 100644 --- a/src/Node.Stream.CBOR.Decode.js +++ b/src/Node.Stream.CBOR.Decode.js @@ -1,7 +1,7 @@ import { DecoderStream } from "cbor-x"; /** @type {(s: import('cbor-x').Options) => () => DecoderStream} */ -export const makeImpl = (c) => () => new DecoderStream({useRecords: false, ...c}); +export const makeImpl = (c) => () => new DecoderStream({useRecords: false, ...c, allowHalfOpen: true}); /** @type {(s: DecoderStream) => () => unknown | null} */ export const readImpl = (p) => () => p.read(); diff --git a/src/Node.Stream.CBOR.Encode.js b/src/Node.Stream.CBOR.Encode.js index 003cfbd..a2177f6 100644 --- a/src/Node.Stream.CBOR.Encode.js +++ b/src/Node.Stream.CBOR.Encode.js @@ -1,7 +1,7 @@ import { EncoderStream } from "cbor-x"; /** @type {(s: import('cbor-x').Options) => () => EncoderStream} */ -export const makeImpl = (c) => () => new EncoderStream({useRecords: false, ...c}); +export const makeImpl = (c) => () => new EncoderStream({useRecords: false, ...c, allowHalfOpen: true}); /** @type {(s: EncoderStream) => (a: unknown) => () => void} */ export const writeImpl = (s) => (a) => () => s.write(a); diff --git a/src/Pipes.CBOR.purs b/src/Pipes.CBOR.purs index 41a71ae..a9a9140 100644 --- a/src/Pipes.CBOR.purs +++ b/src/Pipes.CBOR.purs @@ -4,19 +4,24 @@ import Prelude import Control.Monad.Error.Class (class MonadThrow, liftEither) import Control.Monad.Except (runExcept) -import Control.Monad.Rec.Class (class MonadRec, forever) +import Control.Monad.Rec.Class (class MonadRec) import Data.Bifunctor (lmap) import Data.CBOR (class ReadCBOR, class WriteCBOR, readCBOR, writeCBOR) +import Data.Either (Either) import Data.Maybe (Maybe) +import Data.Profunctor as Pro +import Data.Traversable (traverse) import Effect.Aff.Class (class MonadAff) -import Effect.Class (liftEffect) import Effect.Exception (Error, error) +import Foreign (Foreign, MultipleErrors) import Node.Buffer (Buffer) import Node.Stream.CBOR.Decode as CBOR.Decode import Node.Stream.CBOR.Encode as CBOR.Encode -import Pipes (await, yield, (>->)) +import Pipes.Async (AsyncPipe) import Pipes.Core (Pipe) +import Pipes.Node.Stream (TransformContext) import Pipes.Node.Stream as Pipes.Stream +import Pipes.Prelude as Pipe -- | Transforms buffer chunks of a CBOR file to parsed values -- | of type `a`. @@ -26,15 +31,14 @@ decode => MonadAff m => MonadThrow Error m => ReadCBOR a - => Pipe (Maybe Buffer) (Maybe a) m Unit + => AsyncPipe (TransformContext Buffer Foreign) m (Maybe Buffer) (Maybe (Either MultipleErrors a)) decode = do - raw <- liftEffect $ CBOR.Decode.make {} let - unmarshal = forever do - r <- await - yield =<< liftEither (lmap (error <<< show) $ runExcept $ readCBOR @a r) - parser = Pipes.Stream.fromTransform $ CBOR.Decode.toObjectStream raw - parser >-> Pipes.Stream.inEOS unmarshal + parser = Pipes.Stream.fromTransform $ CBOR.Decode.toObjectStream <$> CBOR.Decode.make {} + Pro.rmap (map (runExcept <<< readCBOR @a)) parser + +decodeError :: forall m a r. MonadThrow Error m => Pipe (Maybe (Either MultipleErrors a)) (Maybe a) m r +decodeError = Pipe.mapM (traverse liftEither <<< map (lmap $ error <<< show)) -- | Encode purescript values as CBOR buffers encode @@ -43,10 +47,9 @@ encode => MonadThrow Error m => MonadRec m => WriteCBOR a - => Pipe (Maybe a) (Maybe Buffer) m Unit -encode = do - raw <- liftEffect $ CBOR.Encode.make {} + => AsyncPipe (TransformContext Foreign Buffer) m (Maybe a) (Maybe Buffer) +encode = let - printer = Pipes.Stream.fromTransform $ CBOR.Encode.toObjectStream raw - marshal = forever $ yield =<< (writeCBOR <$> await) - Pipes.Stream.inEOS marshal >-> printer + p = Pipes.Stream.fromTransform $ CBOR.Encode.toObjectStream <$> CBOR.Encode.make {} + in + Pro.lcmap (map writeCBOR) p diff --git a/test/Test/Pipes.CSV.purs b/test/Test/Pipes.CSV.purs index add3685..86e3e4f 100644 --- a/test/Test/Pipes.CSV.purs +++ b/test/Test/Pipes.CSV.purs @@ -3,28 +3,32 @@ module Test.Pipes.CBOR where import Prelude import Control.Monad.Cont (lift) +import Control.Monad.Error.Class (liftEither) import Control.Monad.Gen (chooseInt) -import Data.Array as Array +import Data.Bifunctor (lmap) import Data.DateTime (DateTime) import Data.List ((:)) import Data.List as List import Data.Maybe (Maybe(..), fromJust) import Data.Newtype (wrap) import Data.PreciseDateTime (fromRFC3339String, toDateTimeLossy) +import Data.Traversable (traverse) import Data.Tuple.Nested ((/\)) import Effect (Effect) import Effect.Aff (delay) import Effect.CBOR as CBOR import Effect.Class (liftEffect) +import Effect.Exception (error) import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) import Partial.Unsafe (unsafePartial) import Pipes (yield, (>->)) +import Pipes.Async (debug, (>-/->)) import Pipes.CBOR as Pipes.CBOR import Pipes.Collect as Pipes.Collect import Pipes.Node.Stream as Pipes.Stream -import Pipes.Prelude (toListM) as Pipes +import Pipes.Prelude (mapM, toListM) as Pipes import Test.QuickCheck.Gen (randomSample') import Test.Spec (Spec, before, describe, it) import Test.Spec.Assertions (shouldEqual) @@ -52,7 +56,7 @@ spec = bytes <- Pipes.Collect.toBuffer $ Pipes.Stream.withEOS (yield exp) - >-> Pipes.CBOR.encode + >-/-> Pipes.CBOR.encode >-> Pipes.Stream.unEOS act <- liftEffect $ CBOR.decode bytes act `shouldEqual` exp @@ -61,8 +65,9 @@ spec = it "parses csv" do buf <- liftEffect $ cborBuf rows <- Pipes.toListM - $ Pipes.Stream.withEOS (yield buf *> lift (delay $ wrap 10.0)) - >-> Pipes.CBOR.decode + $ (yield (Just buf) *> yield Nothing) + >-/-> debug "cbor" Pipes.CBOR.decode + >-> Pipes.CBOR.decodeError rows `shouldEqual` ((Just exp) : Nothing : List.Nil) before @@ -73,7 +78,7 @@ spec = bytes <- Pipes.Collect.toBuffer $ Pipes.Stream.withEOS (yield objs) - >-> Pipes.CBOR.encode + >-/-> Pipes.CBOR.encode >-> Pipes.Stream.unEOS pure $ nums /\ bytes ) @@ -81,7 +86,8 @@ spec = rows <- Pipes.Collect.toArray $ Pipes.Stream.withEOS (yield bytes) - >-> Pipes.CBOR.decode @(Array {id :: Int}) + >-/-> Pipes.CBOR.decode @(Array {id :: Int}) + >-> Pipes.CBOR.decodeError >-> Pipes.Stream.unEOS rows `shouldEqual` [(\id -> { id }) <$> nums]