From 805f3b8887a070d430b397640bfc8132a3ec7e0c Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 10 May 2024 15:04:09 -0500 Subject: [PATCH] feat: Pipes.Node.FS, Pipes.Node.Buffer, etc. --- spago.lock | 25 +++- spago.yaml | 12 +- src/Control.Monad.Cleanup.Class.purs | 24 ++++ src/Node.Stream.Object.purs | 59 +++++---- src/Pipes.Node.Buffer.purs | 17 +++ src/Pipes.Node.FS.purs | 57 +++++++++ src/Pipes.Node.Stream.purs | 184 ++++++++++++++++----------- src/Pipes.Node.Zlib.purs | 40 ++++++ src/Pipes.String.purs | 43 +++++++ src/Pipes.Util.purs | 32 +++++ test/Test/Common.purs | 38 ++++++ test/Test/Main.purs | 8 +- test/Test/Pipes.Node.Buffer.purs | 69 ++++++++++ test/Test/Pipes.Node.FS.purs | 86 +++++++++++++ test/Test/Pipes.Node.Stream.js | 15 +++ test/Test/Pipes.Node.Stream.purs | 126 ++++++++---------- 16 files changed, 664 insertions(+), 171 deletions(-) create mode 100644 src/Control.Monad.Cleanup.Class.purs create mode 100644 src/Pipes.Node.Buffer.purs create mode 100644 src/Pipes.Node.FS.purs create mode 100644 src/Pipes.Node.Zlib.purs create mode 100644 src/Pipes.String.purs create mode 100644 src/Pipes.Util.purs create mode 100644 test/Test/Common.purs create mode 100644 test/Test/Pipes.Node.Buffer.purs create mode 100644 test/Test/Pipes.Node.FS.purs diff --git a/spago.lock b/spago.lock index e78dc8c..904bdf6 100644 --- a/spago.lock +++ b/spago.lock @@ -4,19 +4,28 @@ workspace: path: ./ dependencies: - aff: ">=7.1.0 <8.0.0" - - control: ">=6.0.0 <7.0.0" + - arrays - effect: ">=4.0.0 <5.0.0" - either: ">=6.1.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0" + - foldable-traversable - maybe: ">=6.0.0 <7.0.0" + - mmorph - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" + - node-fs + - node-path - node-streams: ">=9.0.0 <10.0.0" + - node-zlib + - parallel - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" - - tailrec: ">=6.1.0 <7.0.0" + - st + - strings + - tailrec - transformers: ">=6.0.0 <7.0.0" + - tuples - unsafe-coerce: ">=6.0.0 <7.0.0" test_dependencies: - console @@ -26,6 +35,7 @@ workspace: - quickcheck - simple-json - spec + - spec-quickcheck build_plan: - aff - ansi @@ -87,6 +97,7 @@ workspace: - safe-coerce - simple-json - spec + - spec-quickcheck - st - strings - tailrec @@ -791,6 +802,16 @@ packages: - tailrec - transformers - tuples + spec-quickcheck: + type: registry + version: 5.0.0 + integrity: sha256-iE0iThqZCuDGe3pwg5RvqcL8E5cRQ4txDuloCclOsCs= + dependencies: + - aff + - prelude + - quickcheck + - random + - spec st: type: registry version: 6.2.0 diff --git a/spago.yaml b/spago.yaml index 0c4b0bb..13ed7e8 100644 --- a/spago.yaml +++ b/spago.yaml @@ -11,19 +11,28 @@ package: pedanticPackages: true dependencies: - aff: ">=7.1.0 <8.0.0" - - control: ">=6.0.0 <7.0.0" + - arrays: ">=7.3.0 <8.0.0" - effect: ">=4.0.0 <5.0.0" - either: ">=6.1.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0" + - foldable-traversable: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0" + - mmorph: ">=7.0.0 <8.0.0" - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" + - node-fs: ">=9.1.0 <10.0.0" + - node-path: ">=5.0.0 <6.0.0" - node-streams: ">=9.0.0 <10.0.0" + - node-zlib: ">=0.4.0 <0.5.0" + - parallel: ">=6.0.0 <7.0.0" - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" + - st: ">=6.2.0 <7.0.0" + - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0" + - tuples: ">=7.0.0 <8.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0" test: main: Test.Main @@ -35,5 +44,6 @@ package: - quickcheck - simple-json - spec + - spec-quickcheck workspace: extraPackages: {} diff --git a/src/Control.Monad.Cleanup.Class.purs b/src/Control.Monad.Cleanup.Class.purs new file mode 100644 index 0000000..683a1fc --- /dev/null +++ b/src/Control.Monad.Cleanup.Class.purs @@ -0,0 +1,24 @@ +module Control.Monad.Cleanup where + +import Prelude + +import Control.Monad.Error.Class (class MonadError, liftEither, try) +import Control.Monad.State (StateT, modify_, runStateT) +import Data.Tuple.Nested ((/\)) + +type CleanupT m = StateT (m Unit) m + +finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit +finally m = modify_ (_ *> m) + +runCleanup :: forall m a. Monad m => CleanupT m a -> m a +runCleanup m = do + a /\ final <- runStateT m (pure unit) + final + pure a + +runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a +runCleanupE m = do + ea /\ final <- runStateT (try m) (pure unit) + final + liftEither ea diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 30465b1..c575320 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -2,7 +2,16 @@ module Node.Stream.Object where import Prelude +import Control.Monad.Error.Class (liftEither) +import Control.Monad.ST.Class (liftST) +import Control.Monad.ST.Global (Global) +import Control.Monad.ST.Ref (STRef) +import Control.Monad.ST.Ref as STRef +import Control.Parallel (parOneOf) import Data.Either (Either(..)) +import Data.Generic.Rep (class Generic) +import Data.Maybe (Maybe(..)) +import Data.Show.Generic (genericShow) import Effect (Effect) import Effect.Aff (Aff, effectCanceler, makeAff) import Effect.Class (liftEffect) @@ -19,11 +28,19 @@ data ReadResult a = ReadWouldBlock | ReadClosed | ReadJust a +derive instance Generic (ReadResult a) _ +derive instance Functor ReadResult +derive instance Eq a => Eq (ReadResult a) +instance Show (ReadResult a) where + show = genericShow <<< map (const "..") data WriteResult = WriteWouldBlock | WriteClosed | WriteOk +derive instance Generic WriteResult _ +derive instance Eq WriteResult +instance Show WriteResult where show = genericShow type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a } type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult } @@ -100,6 +117,12 @@ else instance (Write s a) => Write s a where write s a = write s a end s = end s +withErrorST :: forall s. Stream s => s -> Effect {cancel :: Effect Unit, error :: STRef Global (Maybe Error)} +withErrorST s = do + error <- liftST $ STRef.new Nothing + cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error + pure {error, cancel} + fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer fromBufferReadable = unsafeCoerce @@ -123,34 +146,26 @@ awaitReadableOrClosed s = do closed <- liftEffect $ isClosed s ended <- liftEffect $ isReadableEnded s readable <- liftEffect $ isReadable s - when (not ended && not closed && not readable) $ makeAff \res -> do - cancelClose <- Event.once closeH (res $ Right unit) s - cancelError <- Event.once errorH (res <<< Left) s - cancelReadable <- flip (Event.once readableH) s do - cancelClose - cancelError - res $ Right unit - pure $ effectCanceler do - cancelReadable - cancelClose - cancelError + when (not ended && not closed && not readable) + $ liftEither =<< parOneOf [onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s] awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed s = do closed <- liftEffect $ isClosed s ended <- liftEffect $ isWritableEnded s writable <- liftEffect $ isWritable s - when (not closed && not ended && not writable) $ makeAff \res -> do - cancelClose <- Event.once closeH (res $ Right unit) s - cancelError <- Event.once errorH (res <<< Left) s - cancelDrain <- flip (Event.once drainH) s do - cancelClose - cancelError - res $ Right unit - pure $ effectCanceler do - cancelDrain - cancelClose - cancelError + when (not ended && not closed && not writable) + $ liftEither =<< parOneOf [onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s] + +onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit +onceAff0 h emitter = makeAff \res -> do + cancel <- Event.once h (res $ Right unit) emitter + pure $ effectCanceler cancel + +onceAff1 :: forall e a. EventHandle1 e a -> e -> Aff a +onceAff1 h emitter = makeAff \res -> do + cancel <- Event.once h (res <<< Right) emitter + pure $ effectCanceler cancel readableH :: forall s a. Read s a => EventHandle0 s readableH = EventHandle "readable" identity diff --git a/src/Pipes.Node.Buffer.purs b/src/Pipes.Node.Buffer.purs new file mode 100644 index 0000000..761a884 --- /dev/null +++ b/src/Pipes.Node.Buffer.purs @@ -0,0 +1,17 @@ +module Pipes.Node.Buffer where + +import Prelude + +import Control.Monad.Morph (hoist) +import Effect.Class (class MonadEffect, liftEffect) +import Node.Buffer (Buffer) +import Node.Buffer as Buffer +import Node.Encoding (Encoding) +import Pipes.Core (Pipe) +import Pipes.Prelude as Pipes + +toString :: forall m. MonadEffect m => Encoding -> Pipe Buffer String m Unit +toString enc = hoist liftEffect $ Pipes.mapM $ Buffer.toString enc + +fromString :: forall m. MonadEffect m => Encoding -> Pipe String Buffer m Unit +fromString enc = hoist liftEffect $ Pipes.mapM $ flip Buffer.fromString enc diff --git a/src/Pipes.Node.FS.purs b/src/Pipes.Node.FS.purs new file mode 100644 index 0000000..a73616c --- /dev/null +++ b/src/Pipes.Node.FS.purs @@ -0,0 +1,57 @@ +module Pipes.Node.FS where + +import Prelude + +import Data.Maybe (Maybe) +import Effect.Aff (Aff) +import Effect.Class (liftEffect) +import Node.Buffer (Buffer) +import Node.FS.Stream (WriteStreamOptions) +import Node.FS.Stream as FS.Stream +import Node.Path (FilePath) +import Node.Stream.Object as O +import Pipes.Core (Consumer, Producer) +import Pipes.Node.Stream (fromReadable, fromWritable) +import Prim.Row (class Union) + +-- | Creates a `fs.Writable` stream for the file +-- | at the given path. +-- | +-- | Writing `Nothing` to this pipe will close the stream. +-- | +-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a` +-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting. +write + :: forall r trash + . Union r trash WriteStreamOptions + => Record r -> FilePath -> Consumer (Maybe Buffer) Aff Unit +write o p = do + w <- liftEffect $ FS.Stream.createWriteStream' p o + fromWritable $ O.fromBufferWritable w + +-- | Open a file in write mode, failing if the file already exists. +-- | +-- | `write {flags: "wx"}` +create :: FilePath -> Consumer (Maybe Buffer) Aff Unit +create = write {flags: "wx"} + +-- | Open a file in write mode, truncating it if the file already exists. +-- | +-- | `write {flags: "w"}` +truncate :: FilePath -> Consumer (Maybe Buffer) Aff Unit +truncate = write {flags: "w"} + +-- | Open a file in write mode, appending written contents if the file already exists. +-- | +-- | `write {flags: "a"}` +append :: FilePath -> Consumer (Maybe Buffer) Aff Unit +append = write {flags: "a"} + +-- | Creates a `fs.Readable` stream for the file at the given path. +-- | +-- | Emits `Nothing` before closing. To opt out of this behavior, +-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`. +read :: FilePath -> Producer (Maybe Buffer) Aff Unit +read p = do + r <- liftEffect $ FS.Stream.createReadStream p + fromReadable $ O.fromBufferReadable r diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 3dbc5e3..5712e18 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -2,94 +2,134 @@ module Pipes.Node.Stream where import Prelude -import Control.Alternative (empty) -import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) +import Control.Monad.Error.Class (throwError) import Control.Monad.Rec.Class (whileJust) +import Control.Monad.ST.Class (liftST) +import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) -import Data.Maybe (Maybe(..)) +import Data.Maybe (Maybe(..), maybe) import Data.Newtype (wrap) +import Data.Traversable (for_) import Effect.Aff (Aff, delay) import Effect.Aff.Class (liftAff) import Effect.Class (liftEffect) import Node.Stream.Object as O -import Pipes (await, yield) -import Pipes.Core (Consumer, Pipe, Producer) -import Pipes.Internal (Proxy) -import Pipes.Internal as P.I +import Pipes (await, yield, (>->)) +import Pipes (for) as P +import Pipes.Core (Consumer, Pipe, Producer, Producer_) +import Pipes.Prelude (mapFoldable, map) as P -type ProxyFFI :: Type -> Type -> Type -> Type -> Type -> Type -> Type -type ProxyFFI a' a b' b r pipe = - { pure :: r -> pipe - , request :: a' -> (a -> pipe) -> pipe - , respond :: b -> (b' -> pipe) -> pipe - } +-- | Convert a `Readable` stream to a `Pipe`. +-- | +-- | This will yield `Nothing` before exiting, signaling +-- | End-of-stream. +fromReadable :: forall s a. O.Read s a => s -> Producer_ (Maybe a) Aff Unit +fromReadable r = + let + cleanup rmErrorListener = do + liftEffect rmErrorListener + go {error, cancel} = do + liftAff $ delay $ wrap 0.0 + err <- liftEffect $ liftST $ STRef.read error + for_ err throwError -proxyFFI :: forall m a' a b' b r. ProxyFFI a' a b' b r (Proxy a' a b' b m r) -proxyFFI = { pure: P.I.Pure, request: P.I.Request, respond: P.I.Respond } - -fromReadable :: forall s a. O.Read s a => s -> Producer (Maybe a) Aff Unit -fromReadable r = whileJust do - liftAff $ delay $ wrap 0.0 - a <- liftEffect $ O.read r - case a of - O.ReadWouldBlock -> do - lift $ O.awaitReadableOrClosed r - pure $ Just unit - O.ReadClosed -> do - yield Nothing - pure Nothing - O.ReadJust a' -> do - yield $ Just a' - pure $ Just unit + res <- liftEffect $ O.read r + case res of + O.ReadJust a -> yield (Just a) *> go {error, cancel} + O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel} + O.ReadClosed -> yield Nothing *> cleanup cancel + in do + e <- liftEffect $ O.withErrorST r + go e +-- | Convert a `Writable` stream to a `Pipe`. +-- | +-- | When `Nothing` is piped to this, the stream will +-- | be `end`ed, and the pipe will noop if invoked again. fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit -fromWritable w = do - whileJust $ runMaybeT do - liftAff $ delay $ wrap 0.0 - a <- MaybeT await - res <- liftEffect $ O.write w a - case res of - O.WriteClosed -> empty - O.WriteOk -> pure unit - O.WriteWouldBlock -> do - liftAff $ O.awaitWritableOrClosed w - pure unit - liftEffect $ O.end w +fromWritable w = + let + cleanup rmErrorListener = do + liftEffect rmErrorListener + liftEffect $ O.end w + go {error, cancel} = do + liftAff $ delay $ wrap 0.0 + err <- liftEffect $ liftST $ STRef.read error + for_ err throwError + ma <- await + case ma of + Nothing -> cleanup cancel + Just a -> do + res <- liftEffect $ O.write w a + case res of + O.WriteOk -> go {error, cancel} + O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel} + O.WriteClosed -> pure unit + in do + r <- liftEffect $ O.withErrorST w + go r + +-- | Convert a `Transform` stream to a `Pipe`. +-- | +-- | When `Nothing` is piped to this, the `Transform` stream will +-- | be `end`ed, and the pipe will noop if invoked again. fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit fromTransform t = let - read' {exitOnWouldBlock} = - whileJust $ runMaybeT do - liftAff $ delay $ wrap 0.0 - res <- liftEffect $ O.read t - case res of - O.ReadWouldBlock -> - if exitOnWouldBlock then do - empty - else do - liftAff $ O.awaitReadableOrClosed t - pure unit - O.ReadJust b -> do - lift $ yield $ Just b - pure unit - O.ReadClosed -> do - lift $ yield Nothing - empty - in do - whileJust $ runMaybeT do + cleanup removeErrorListener = do + liftEffect $ O.end t + liftEffect $ removeErrorListener + fromReadable t + yieldFromReadableHalf = do + res <- liftEffect (O.read t) + case res of + O.ReadJust a -> yield (Just a) + O.ReadWouldBlock -> pure unit + O.ReadClosed -> yield Nothing *> pure unit + go {error, cancel} = do liftAff $ delay $ wrap 0.0 + err <- liftEffect $ liftST $ STRef.read error + for_ err throwError - a <- MaybeT await - writeRes <- liftEffect $ O.write t a + ma <- await + case ma of + Nothing -> cleanup cancel + Just a' -> do + res <- liftEffect $ O.write t a' + yieldFromReadableHalf + case res of + O.WriteOk -> go {error, cancel} + O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel} + O.WriteClosed -> cleanup cancel + in do + r <- liftEffect $ O.withErrorST t + go r - lift $ read' {exitOnWouldBlock: true} +-- | Given a `Producer` of values, wrap them in `Just`. +-- | +-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal. +withEOS :: forall a. Producer a Aff Unit -> Producer (Maybe a) Aff Unit +withEOS a = do + P.for a (yield <<< Just) + yield Nothing - case writeRes of - O.WriteOk -> pure unit - O.WriteClosed -> empty - O.WriteWouldBlock -> do - liftAff $ O.awaitWritableOrClosed t - pure unit - liftEffect $ O.end t - read' {exitOnWouldBlock: false} +-- | Strip a pipeline of the EOS signal +unEOS :: forall a. Pipe (Maybe a) a Aff Unit +unEOS = P.mapFoldable identity + +-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`. +-- | +-- | Allows easily using pipes not concerned with the EOS signal with +-- | pipes that do need this signal. +-- | +-- | (ex. `Pipes.Node.Buffer.toString` doesn't need an EOS signal, but `Pipes.Node.FS.create` does.) +-- | +-- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`. +-- | +-- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again. +inEOS :: forall a b. Pipe a b Aff Unit -> Pipe (Maybe a) (Maybe b) Aff Unit +inEOS p = whileJust do + ma <- await + maybe (yield Nothing) (\a -> yield a >-> p >-> P.map Just) ma + pure $ void ma diff --git a/src/Pipes.Node.Zlib.purs b/src/Pipes.Node.Zlib.purs new file mode 100644 index 0000000..bfbe0f8 --- /dev/null +++ b/src/Pipes.Node.Zlib.purs @@ -0,0 +1,40 @@ +module Pipes.Node.Zlib where + +import Prelude + +import Data.Maybe (Maybe) +import Effect (Effect) +import Effect.Aff (Aff) +import Effect.Class (liftEffect) +import Node.Buffer (Buffer) +import Node.Stream.Object as O +import Node.Zlib as Zlib +import Node.Zlib.Types (ZlibStream) +import Pipes.Core (Pipe) +import Pipes.Node.Stream (fromTransform) + +fromZlib :: forall r. Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +fromZlib z = do + raw <- liftEffect $ Zlib.toDuplex <$> z + fromTransform $ O.fromBufferTransform raw + +gzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +gzip = fromZlib Zlib.createGzip + +gunzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +gunzip = fromZlib Zlib.createGunzip + +unzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +unzip = fromZlib Zlib.createUnzip + +inflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +inflate = fromZlib Zlib.createInflate + +deflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +deflate = fromZlib Zlib.createDeflate + +brotliCompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +brotliCompress = fromZlib Zlib.createBrotliCompress + +brotliDecompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit +brotliDecompress = fromZlib Zlib.createBrotliDecompress diff --git a/src/Pipes.String.purs b/src/Pipes.String.purs new file mode 100644 index 0000000..592fb6f --- /dev/null +++ b/src/Pipes.String.purs @@ -0,0 +1,43 @@ +module Pipes.String where + +import Prelude + +import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) +import Control.Monad.Rec.Class (whileJust) +import Control.Monad.ST.Class (liftST) +import Control.Monad.Trans.Class (lift) +import Data.Array.ST as Array.ST +import Data.Foldable (fold, traverse_) +import Data.Maybe (Maybe(..)) +import Data.String (Pattern) +import Data.String as String +import Effect.Class (class MonadEffect, liftEffect) +import Pipes (await, yield) +import Pipes.Core (Pipe) + +-- | Accumulate string chunks until `pat` is seen, then `yield` the buffered +-- | string up to (and not including) the pattern. +-- | +-- | When end-of-stream is reached, yields the remaining buffered string then `Nothing`. +-- | +-- | ``` +-- | toList $ yield "foo,bar,baz" >-> split "," +-- | -- "foo" : "bar" : "baz" : Nil +-- | ``` +split :: forall m. MonadEffect m => Pattern -> Pipe (Maybe String) (Maybe String) m Unit +split pat = do + buf <- liftEffect $ liftST $ Array.ST.new + whileJust $ runMaybeT do + chunk <- MaybeT await + case String.indexOf pat chunk of + Nothing -> void $ liftEffect $ liftST $ Array.ST.push chunk buf + Just ix -> do + let + {before, after} = String.splitAt ix chunk + len <- liftEffect $ liftST $ Array.ST.length buf + buf' <- liftEffect $ liftST $ Array.ST.splice 0 len [] buf + lift $ yield $ Just $ (fold buf') <> before + void $ liftEffect $ liftST $ Array.ST.push (String.drop 1 after) buf + buf' <- liftEffect $ liftST $ Array.ST.unsafeFreeze buf + traverse_ yield (Just <$> String.split pat (fold buf')) + yield Nothing diff --git a/src/Pipes.Util.purs b/src/Pipes.Util.purs new file mode 100644 index 0000000..b378de2 --- /dev/null +++ b/src/Pipes.Util.purs @@ -0,0 +1,32 @@ +module Pipes.Util where + +import Prelude + +import Control.Monad.Rec.Class (whileJust) +import Control.Monad.ST.Class (liftST) +import Control.Monad.ST.Ref as STRef +import Data.Maybe (Maybe(..)) +import Effect.Class (class MonadEffect, liftEffect) +import Pipes (await, yield) +import Pipes.Core (Pipe) + +-- | Yields a separator value `sep` between received values +-- | +-- | ```purescript +-- | toList $ (yield "a" *> yield "b" *> yield "c") >-> intersperse "," +-- | -- "a" : "," : "b" : "," : "c" : Nil +-- | ``` +intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit +intersperse sep = do + isFirst <- liftEffect $ liftST $ STRef.new true + whileJust do + ma <- await + isFirst' <- liftEffect $ liftST $ STRef.read isFirst + case ma of + Just a + | isFirst' -> do + void $ liftEffect $ liftST $ STRef.write false isFirst + yield $ Just a + | otherwise -> yield (Just sep) *> yield (Just a) + Nothing -> yield Nothing + pure $ void ma diff --git a/test/Test/Common.purs b/test/Test/Common.purs new file mode 100644 index 0000000..e467956 --- /dev/null +++ b/test/Test/Common.purs @@ -0,0 +1,38 @@ +module Test.Common where + +import Prelude + +import Control.Monad.Error.Class (class MonadError, liftEither, try) +import Data.Bifunctor (lmap) +import Data.String.Gen (genAlphaString) +import Data.Tuple (fst) +import Data.Tuple.Nested (type (/\), (/\)) +import Effect.Aff (Aff, bracket) +import Effect.Class (liftEffect) +import Effect.Exception (Error, error) +import Node.FS.Sync as FS +import Pipes.Core (Pipe) +import Pipes.Prelude as Pipes +import Simple.JSON (class ReadForeign, class WriteForeign, readJSON, writeJSON) +import Test.QuickCheck.Gen (randomSampleOne, resize) + +tmpFile :: (String -> Aff Unit) -> Aff Unit +tmpFile f = tmpFiles (f <<< fst) + +tmpFiles :: (String /\ String -> Aff Unit) -> Aff Unit +tmpFiles = + let + acq = do + randa <- liftEffect $ randomSampleOne $ resize 10 genAlphaString + randb <- liftEffect $ randomSampleOne $ resize 10 genAlphaString + void $ try $ liftEffect $ FS.mkdir ".tmp" + pure $ (".tmp/tmp." <> randa) /\ (".tmp/tmp." <> randb) + rel (a /\ b) = liftEffect (try (FS.rm a) *> void (try $ FS.rm b)) + in + bracket acq rel + +jsonStringify :: forall m a. Monad m => WriteForeign a => Pipe a String m Unit +jsonStringify = Pipes.map writeJSON + +jsonParse :: forall m @a. MonadError Error m => ReadForeign a => Pipe String a m Unit +jsonParse = Pipes.mapM (liftEither <<< lmap (error <<< show) <<< readJSON) diff --git a/test/Test/Main.purs b/test/Test/Main.purs index e7b57c7..fab2200 100644 --- a/test/Test/Main.purs +++ b/test/Test/Main.purs @@ -6,9 +6,13 @@ import Data.Maybe (Maybe(..)) import Effect (Effect) import Effect.Aff (launchAff_) import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream -import Test.Spec.Reporter (consoleReporter, specReporter) +import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer +import Test.Pipes.Node.FS as Test.Pipes.Node.FS +import Test.Spec.Reporter (specReporter) import Test.Spec.Runner (defaultConfig, runSpec') main :: Effect Unit -main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ specReporter ] do +main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do Test.Pipes.Node.Stream.spec + Test.Pipes.Node.Buffer.spec + Test.Pipes.Node.FS.spec diff --git a/test/Test/Pipes.Node.Buffer.purs b/test/Test/Pipes.Node.Buffer.purs new file mode 100644 index 0000000..3f94a9c --- /dev/null +++ b/test/Test/Pipes.Node.Buffer.purs @@ -0,0 +1,69 @@ +module Test.Pipes.Node.Buffer where + +import Prelude + +import Control.Monad.Error.Class (catchError) +import Control.Monad.Gen (chooseInt, sized) +import Data.Array as Array +import Data.FoldableWithIndex (forWithIndex_) +import Data.Int as Int +import Data.String.Gen (genAsciiString) +import Data.Tuple (fst, snd) +import Data.Tuple.Nested ((/\)) +import Effect.Class (liftEffect) +import Effect.Unsafe (unsafePerformEffect) +import Node.Buffer (Buffer, BufferValueType(..)) +import Node.Buffer as Buffer +import Node.Encoding (Encoding(..)) +import Pipes ((>->)) +import Pipes (each) as Pipes +import Pipes.Core (runEffect) as Pipes +import Pipes.Node.Buffer as Pipes.Node.Buffer +import Pipes.Prelude (drain, toListM) as Pipes +import Test.QuickCheck (class Arbitrary) +import Test.QuickCheck.Arbitrary (arbitrary) +import Test.QuickCheck.Gen (randomSample', vectorOf) +import Test.Spec (Spec, describe, it) +import Test.Spec.Assertions (fail, shouldEqual) + +data BufferJunk = BufferJunk Buffer +instance Arbitrary BufferJunk where + arbitrary = sized \s -> do + ns <- vectorOf s (chooseInt 0 7) + pure $ unsafePerformEffect do + buf <- Buffer.alloc s + forWithIndex_ ns \ix n -> Buffer.write UInt8 (Int.toNumber n) ix buf + pure $ BufferJunk buf + +data BufferUTF8 = BufferUTF8 String Buffer +instance Arbitrary BufferUTF8 where + arbitrary = do + s <- genAsciiString + pure $ BufferUTF8 s $ unsafePerformEffect $ Buffer.fromString s UTF8 + +spec :: Spec Unit +spec = describe "Pipes.Node.Buffer" do + describe "toString" do + it "fails when encoding wrong" do + vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary) + let + uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain + ok = do + uut + fail "Should have thrown" + err _ = pure unit + catchError ok err + it "junk OK in hex" do + vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary) + Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain + it "UTF8 ok" do + vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary) + let + bufs = Pipes.each $ snd <$> vals + strs = fst <$> vals + act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8) + act `shouldEqual` strs + describe "fromString" do + it "ok" do + vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString) + Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain diff --git a/test/Test/Pipes.Node.FS.purs b/test/Test/Pipes.Node.FS.purs new file mode 100644 index 0000000..5f95b86 --- /dev/null +++ b/test/Test/Pipes.Node.FS.purs @@ -0,0 +1,86 @@ +module Test.Pipes.Node.FS where + +import Prelude + +import Control.Monad.Error.Class (catchError) +import Data.Foldable (fold, intercalate) +import Data.Newtype (wrap) +import Data.Tuple.Nested ((/\)) +import Effect.Class (liftEffect) +import Node.Encoding (Encoding(..)) +import Node.FS.Sync as FS +import Pipes (yield, (>->)) +import Pipes.Core (runEffect) as Pipes +import Pipes.Node.Buffer as Pipes.Node.Buffer +import Pipes.Node.FS as Pipes.Node.FS +import Pipes.Node.Stream (inEOS, unEOS, withEOS) +import Pipes.Prelude (drain, map, toListM) as Pipes +import Pipes.String as Pipes.String +import Pipes.Util as Pipes.Util +import Simple.JSON (writeJSON) +import Test.Common (jsonParse, tmpFile, tmpFiles) +import Test.Spec (Spec, around, describe, it) +import Test.Spec.Assertions (fail, shouldEqual) + +spec :: Spec Unit +spec = describe "Pipes.Node.FS" do + describe "read" do + around tmpFile $ it "fails if the file does not exist" \p -> do + flip catchError (const $ pure unit) do + Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain + fail "should have thrown" + around tmpFile $ it "reads ok" \p -> do + liftEffect $ FS.writeTextFile UTF8 p "foo" + s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8) + s `shouldEqual` "foo" + around tmpFile $ it "fails if the file already exists" \p -> do + liftEffect $ FS.writeTextFile UTF8 "foo" p + flip catchError (const $ pure unit) do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p + fail "should have thrown" + describe "create" do + around tmpFile $ it "creates the file when not exists" \p -> do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p + contents <- liftEffect $ FS.readTextFile UTF8 p + contents `shouldEqual` "foo" + around tmpFile $ it "fails if the file already exists" \p -> do + liftEffect $ FS.writeTextFile UTF8 "foo" p + flip catchError (const $ pure unit) do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p + fail "should have thrown" + describe "append" do + around tmpFile $ it "creates the file when not exists" \p -> do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p + contents <- liftEffect $ FS.readTextFile UTF8 p + contents `shouldEqual` "foo" + around tmpFile $ it "appends" \p -> do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p + Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p + Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p + contents <- liftEffect $ FS.readTextFile UTF8 p + contents `shouldEqual` "foo\nbar" + describe "truncate" do + around tmpFile $ it "creates the file when not exists" \p -> do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p + contents <- liftEffect $ FS.readTextFile UTF8 p + contents `shouldEqual` "foo" + around tmpFile $ it "overwrites contents" \p -> do + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p + Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p + contents <- liftEffect $ FS.readTextFile UTF8 p + contents `shouldEqual` "bar" + around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do + let + exp = [{foo: "a"}, {foo: "bar"}, {foo: "123"}] + liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp + Pipes.runEffect $ + Pipes.Node.FS.read a + >-> inEOS (Pipes.Node.Buffer.toString UTF8) + >-> Pipes.String.split (wrap "\n") + >-> inEOS (jsonParse @{foo :: String}) + >-> inEOS (Pipes.map _.foo) + >-> Pipes.Util.intersperse "\n" + >-> inEOS (Pipes.Node.Buffer.fromString UTF8) + >-> Pipes.Node.FS.create b + act <- liftEffect $ FS.readTextFile UTF8 b + act `shouldEqual` "a\nbar\n123" diff --git a/test/Test/Pipes.Node.Stream.js b/test/Test/Pipes.Node.Stream.js index 3d4c083..0d37aeb 100644 --- a/test/Test/Pipes.Node.Stream.js +++ b/test/Test/Pipes.Node.Stream.js @@ -1,4 +1,19 @@ import Stream from 'stream' +export const discardTransform = () => new Stream.Transform({ + transform: function(_ck, _enc, cb) { + cb() + }, + objectMode: true +}) + +export const charsTransform = () => new Stream.Transform({ + transform: function(ck, _enc, cb) { + ck.split('').filter(s => !!s).forEach(s => this.push(s)) + cb() + }, + objectMode: true, +}) + /** @type {(a: Array) => Stream.Readable}*/ export const readableFromArray = a => Stream.Readable.from(a) diff --git a/test/Test/Pipes.Node.Stream.purs b/test/Test/Pipes.Node.Stream.purs index dcd3341..8e0c1ab 100644 --- a/test/Test/Pipes.Node.Stream.purs +++ b/test/Test/Pipes.Node.Stream.purs @@ -2,24 +2,18 @@ module Test.Pipes.Node.Stream where import Prelude -import Control.Monad.Error.Class (liftEither, try) -import Control.Monad.Morph (hoist) import Control.Monad.Trans.Class (lift) import Data.Array as Array -import Data.Bifunctor (lmap) -import Data.Foldable (fold, intercalate) +import Data.Foldable (fold) import Data.List ((:)) import Data.List as List -import Data.Maybe (Maybe(..), fromMaybe) +import Data.Maybe (Maybe) import Data.Newtype (wrap) import Data.String.Gen (genAlphaString) -import Data.Traversable (for_, traverse) -import Data.Tuple (fst) import Data.Tuple.Nested (type (/\), (/\)) import Effect (Effect) -import Effect.Aff (Aff, bracket, delay) -import Effect.Class (liftEffect) -import Effect.Exception (error) +import Effect.Aff (Aff, delay) +import Effect.Class (class MonadEffect, liftEffect) import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) @@ -27,53 +21,30 @@ import Node.FS.Stream as FS.Stream import Node.FS.Sync as FS import Node.Stream.Object as O import Node.Zlib as Zlib +import Pipes (each) as Pipes import Pipes (yield, (>->)) -import Pipes.Core (Consumer, Producer, Pipe, runEffect) +import Pipes.Core (Consumer, Producer, runEffect) +import Pipes.Node.Buffer as Pipes.Buffer import Pipes.Node.Stream as S -import Pipes.Prelude as Pipe -import Simple.JSON (class ReadForeign, class WriteForeign, readJSON, writeJSON) +import Pipes.Prelude (mapFoldable, toListM) as Pipes +import Simple.JSON (writeJSON) +import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles) import Test.QuickCheck.Arbitrary (arbitrary) -import Test.QuickCheck.Gen (randomSample', randomSampleOne, resize) +import Test.QuickCheck.Gen (randomSample') import Test.Spec (Spec, around, describe, it) import Test.Spec.Assertions (shouldEqual) foreign import readableFromArray :: forall @a. Array a -> O.Readable a +foreign import discardTransform :: forall a b. Effect (O.Transform a b) +foreign import charsTransform :: Effect (O.Transform String String) -str2buf :: Pipe (Maybe String) (Maybe Buffer) Aff Unit -str2buf = hoist liftEffect $ Pipe.mapM (traverse $ flip Buffer.fromString UTF8) +writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit) +writer a = do + stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a + pure $ stream /\ S.fromWritable stream -buf2str :: Pipe (Maybe Buffer) (Maybe String) Aff Unit -buf2str = hoist liftEffect $ Pipe.mapM (traverse $ Buffer.toString UTF8) - -buf2hex :: Pipe (Maybe Buffer) (Maybe String) Aff Unit -buf2hex = hoist liftEffect $ Pipe.mapM (traverse $ Buffer.toString Hex) - -jsonStringify :: forall a. WriteForeign a => Pipe (Maybe a) (Maybe String) Aff Unit -jsonStringify = Pipe.map (map writeJSON) - -jsonParse :: forall @a. ReadForeign a => Pipe (Maybe String) (Maybe a) Aff Unit -jsonParse = Pipe.mapM (traverse (liftEither <<< lmap (error <<< show) <<< readJSON)) - -writer :: String -> Effect (Consumer (Maybe Buffer) Aff Unit) -writer a = S.fromWritable <$> O.fromBufferWritable <$> FS.Stream.createWriteStream a - -reader :: String -> Effect (Producer (Maybe Buffer) Aff Unit) -reader a = S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a - -tmpFile :: (String -> Aff Unit) -> Aff Unit -tmpFile f = tmpFiles (f <<< fst) - -tmpFiles :: (String /\ String -> Aff Unit) -> Aff Unit -tmpFiles = - let - acq = do - randa <- liftEffect $ randomSampleOne $ resize 10 genAlphaString - randb <- liftEffect $ randomSampleOne $ resize 10 genAlphaString - void $ try $ liftEffect $ FS.mkdir ".tmp" - pure $ ("tmp." <> randa) /\ ("tmp." <> randb) - rel (a /\ b) = liftEffect (try (FS.rm a) *> void (try $ FS.rm b)) - in - bracket acq rel +reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit) +reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a spec :: Spec Unit spec = @@ -81,42 +52,42 @@ spec = describe "Readable" do describe "Readable.from()" do it "empty" do - vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray @{ foo :: String } []) + vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS vals `shouldEqual` List.Nil it "singleton" do - vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) + vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS vals `shouldEqual` ({ foo: "1" } : List.Nil) it "many elements" do let exp = (\n -> { foo: show n }) <$> Array.range 0 100 - vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray exp) + vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS vals `shouldEqual` (List.fromFoldable exp) describe "Writable" $ around tmpFile do describe "fs.WriteStream" do it "pipe to file" \p -> do - w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) + stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) let + w = S.fromWritable stream source = do buf <- liftEffect $ Buffer.fromString "hello" UTF8 - yield $ Just buf - yield Nothing - runEffect $ source >-> w + yield buf + runEffect $ S.withEOS source >-> w contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` "hello" + shouldEqual true =<< liftEffect (O.isWritableEnded stream) it "async pipe to file" \p -> do w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) let source = do - yield $ Just "hello, " + yield "hello, " lift $ delay $ wrap 5.0 - yield $ Just "world!" + yield "world!" lift $ delay $ wrap 5.0 - yield $ Just " " + yield " " lift $ delay $ wrap 5.0 - yield $ Just "this is a " + yield "this is a " lift $ delay $ wrap 5.0 - yield $ Just "test." - yield Nothing - runEffect $ source >-> str2buf >-> w + yield "test." + runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` "hello, world! this is a test." it "chained pipes" \p -> do @@ -126,33 +97,44 @@ spec = num :: Int <- arbitrary stuff :: Array String <- arbitrary pure {str, num, stuff} - objs <- liftEffect $ randomSample' 1 obj + objs <- liftEffect (randomSample' 1 obj) let exp = fold (writeJSON <$> objs) - objs' = for_ (Just <$> objs) yield *> yield Nothing - w <- liftEffect $ writer p - runEffect $ objs' >-> jsonStringify >-> str2buf >-> w + stream /\ w <- liftEffect $ writer p + runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` exp + shouldEqual true =<< liftEffect (O.isWritableEnded stream) describe "Transform" do it "gzip" do let - json = do - yield $ Just $ writeJSON {foo: "bar"} - yield Nothing + json = yield $ writeJSON {foo: "bar"} exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000" gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) - outs :: List.List String <- List.catMaybes <$> Pipe.toListM (json >-> str2buf >-> gzip >-> buf2hex) + outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex) fold outs `shouldEqual` exp around tmpFiles $ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [1, 2, 3, 4] areader <- liftEffect $ reader a - bwriter <- liftEffect $ writer b + bwritestream /\ bwriter <- liftEffect $ writer b gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) runEffect $ areader >-> gzip >-> bwriter + shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream) gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) breader <- liftEffect $ reader b - nums <- Pipe.toListM (breader >-> gunzip >-> buf2str >-> jsonParse @(Array Int) >-> Pipe.mapFoldable (fromMaybe [])) + nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity) Array.fromFoldable nums `shouldEqual` [1, 2, 3, 4] + around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do + liftEffect $ FS.writeTextFile UTF8 p "foo" + r <- reader p + discard' <- liftEffect discardTransform + out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS + out `shouldEqual` List.Nil + around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do + liftEffect $ FS.writeTextFile UTF8 p "foo bar" + r <- reader p + chars' <- liftEffect charsTransform + out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS + out `shouldEqual` List.fromFoldable ["f", "o", "o", " ", "b", "a", "r"]