feat: Pipes.Node.FS, Pipes.Node.Buffer, etc.

This commit is contained in:
orion 2024-05-10 15:04:09 -05:00
parent 01ebfba9ad
commit 805f3b8887
Signed by: orion
GPG Key ID: 6D4165AE4C928719
16 changed files with 664 additions and 171 deletions

View File

@ -4,19 +4,28 @@ workspace:
path: ./ path: ./
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- control: ">=6.0.0 <7.0.0" - arrays
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs
- node-path
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib
- parallel
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - st
- strings
- tailrec
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test_dependencies: test_dependencies:
- console - console
@ -26,6 +35,7 @@ workspace:
- quickcheck - quickcheck
- simple-json - simple-json
- spec - spec
- spec-quickcheck
build_plan: build_plan:
- aff - aff
- ansi - ansi
@ -87,6 +97,7 @@ workspace:
- safe-coerce - safe-coerce
- simple-json - simple-json
- spec - spec
- spec-quickcheck
- st - st
- strings - strings
- tailrec - tailrec
@ -791,6 +802,16 @@ packages:
- tailrec - tailrec
- transformers - transformers
- tuples - tuples
spec-quickcheck:
type: registry
version: 5.0.0
integrity: sha256-iE0iThqZCuDGe3pwg5RvqcL8E5cRQ4txDuloCclOsCs=
dependencies:
- aff
- prelude
- quickcheck
- random
- spec
st: st:
type: registry type: registry
version: 6.2.0 version: 6.2.0

View File

@ -11,19 +11,28 @@ package:
pedanticPackages: true pedanticPackages: true
dependencies: dependencies:
- aff: ">=7.1.0 <8.0.0" - aff: ">=7.1.0 <8.0.0"
- control: ">=6.0.0 <7.0.0" - arrays: ">=7.3.0 <8.0.0"
- effect: ">=4.0.0 <5.0.0" - effect: ">=4.0.0 <5.0.0"
- either: ">=6.1.0 <7.0.0" - either: ">=6.1.0 <7.0.0"
- exceptions: ">=6.0.0 <7.0.0" - exceptions: ">=6.0.0 <7.0.0"
- foldable-traversable: ">=6.0.0 <7.0.0"
- maybe: ">=6.0.0 <7.0.0" - maybe: ">=6.0.0 <7.0.0"
- mmorph: ">=7.0.0 <8.0.0"
- newtype: ">=5.0.0 <6.0.0" - newtype: ">=5.0.0 <6.0.0"
- node-buffer: ">=9.0.0 <10.0.0" - node-buffer: ">=9.0.0 <10.0.0"
- node-event-emitter: ">=3.0.0 <4.0.0" - node-event-emitter: ">=3.0.0 <4.0.0"
- node-fs: ">=9.1.0 <10.0.0"
- node-path: ">=5.0.0 <6.0.0"
- node-streams: ">=9.0.0 <10.0.0" - node-streams: ">=9.0.0 <10.0.0"
- node-zlib: ">=0.4.0 <0.5.0"
- parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"
- transformers: ">=6.0.0 <7.0.0" - transformers: ">=6.0.0 <7.0.0"
- tuples: ">=7.0.0 <8.0.0"
- unsafe-coerce: ">=6.0.0 <7.0.0" - unsafe-coerce: ">=6.0.0 <7.0.0"
test: test:
main: Test.Main main: Test.Main
@ -35,5 +44,6 @@ package:
- quickcheck - quickcheck
- simple-json - simple-json
- spec - spec
- spec-quickcheck
workspace: workspace:
extraPackages: {} extraPackages: {}

View File

@ -0,0 +1,24 @@
module Control.Monad.Cleanup where
import Prelude
import Control.Monad.Error.Class (class MonadError, liftEither, try)
import Control.Monad.State (StateT, modify_, runStateT)
import Data.Tuple.Nested ((/\))
type CleanupT m = StateT (m Unit) m
finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit
finally m = modify_ (_ *> m)
runCleanup :: forall m a. Monad m => CleanupT m a -> m a
runCleanup m = do
a /\ final <- runStateT m (pure unit)
final
pure a
runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a
runCleanupE m = do
ea /\ final <- runStateT (try m) (pure unit)
final
liftEither ea

View File

@ -2,7 +2,16 @@ module Node.Stream.Object where
import Prelude import Prelude
import Control.Monad.Error.Class (liftEither)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as STRef
import Control.Parallel (parOneOf)
import Data.Either (Either(..)) import Data.Either (Either(..))
import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..))
import Data.Show.Generic (genericShow)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, effectCanceler, makeAff) import Effect.Aff (Aff, effectCanceler, makeAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
@ -19,11 +28,19 @@ data ReadResult a
= ReadWouldBlock = ReadWouldBlock
| ReadClosed | ReadClosed
| ReadJust a | ReadJust a
derive instance Generic (ReadResult a) _
derive instance Functor ReadResult
derive instance Eq a => Eq (ReadResult a)
instance Show (ReadResult a) where
show = genericShow <<< map (const "..")
data WriteResult data WriteResult
= WriteWouldBlock = WriteWouldBlock
| WriteClosed | WriteClosed
| WriteOk | WriteOk
derive instance Generic WriteResult _
derive instance Eq WriteResult
instance Show WriteResult where show = genericShow
type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a } type ReadResultFFI a = { closed :: ReadResult a, wouldBlock :: ReadResult a, just :: a -> ReadResult a }
type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult } type WriteResultFFI = { closed :: WriteResult, wouldBlock :: WriteResult, ok :: WriteResult }
@ -100,6 +117,12 @@ else instance (Write s a) => Write s a where
write s a = write s a write s a = write s a
end s = end s end s = end s
withErrorST :: forall s. Stream s => s -> Effect {cancel :: Effect Unit, error :: STRef Global (Maybe Error)}
withErrorST s = do
error <- liftST $ STRef.new Nothing
cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error
pure {error, cancel}
fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer
fromBufferReadable = unsafeCoerce fromBufferReadable = unsafeCoerce
@ -123,34 +146,26 @@ awaitReadableOrClosed s = do
closed <- liftEffect $ isClosed s closed <- liftEffect $ isClosed s
ended <- liftEffect $ isReadableEnded s ended <- liftEffect $ isReadableEnded s
readable <- liftEffect $ isReadable s readable <- liftEffect $ isReadable s
when (not ended && not closed && not readable) $ makeAff \res -> do when (not ended && not closed && not readable)
cancelClose <- Event.once closeH (res $ Right unit) s $ liftEither =<< parOneOf [onceAff0 readableH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s]
cancelError <- Event.once errorH (res <<< Left) s
cancelReadable <- flip (Event.once readableH) s do
cancelClose
cancelError
res $ Right unit
pure $ effectCanceler do
cancelReadable
cancelClose
cancelError
awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit
awaitWritableOrClosed s = do awaitWritableOrClosed s = do
closed <- liftEffect $ isClosed s closed <- liftEffect $ isClosed s
ended <- liftEffect $ isWritableEnded s ended <- liftEffect $ isWritableEnded s
writable <- liftEffect $ isWritable s writable <- liftEffect $ isWritable s
when (not closed && not ended && not writable) $ makeAff \res -> do when (not ended && not closed && not writable)
cancelClose <- Event.once closeH (res $ Right unit) s $ liftEither =<< parOneOf [onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s]
cancelError <- Event.once errorH (res <<< Left) s
cancelDrain <- flip (Event.once drainH) s do onceAff0 :: forall e. EventHandle0 e -> e -> Aff Unit
cancelClose onceAff0 h emitter = makeAff \res -> do
cancelError cancel <- Event.once h (res $ Right unit) emitter
res $ Right unit pure $ effectCanceler cancel
pure $ effectCanceler do
cancelDrain onceAff1 :: forall e a. EventHandle1 e a -> e -> Aff a
cancelClose onceAff1 h emitter = makeAff \res -> do
cancelError cancel <- Event.once h (res <<< Right) emitter
pure $ effectCanceler cancel
readableH :: forall s a. Read s a => EventHandle0 s readableH :: forall s a. Read s a => EventHandle0 s
readableH = EventHandle "readable" identity readableH = EventHandle "readable" identity

View File

@ -0,0 +1,17 @@
module Pipes.Node.Buffer where
import Prelude
import Control.Monad.Morph (hoist)
import Effect.Class (class MonadEffect, liftEffect)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding)
import Pipes.Core (Pipe)
import Pipes.Prelude as Pipes
toString :: forall m. MonadEffect m => Encoding -> Pipe Buffer String m Unit
toString enc = hoist liftEffect $ Pipes.mapM $ Buffer.toString enc
fromString :: forall m. MonadEffect m => Encoding -> Pipe String Buffer m Unit
fromString enc = hoist liftEffect $ Pipes.mapM $ flip Buffer.fromString enc

57
src/Pipes.Node.FS.purs Normal file
View File

@ -0,0 +1,57 @@
module Pipes.Node.FS where
import Prelude
import Data.Maybe (Maybe)
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions)
import Node.FS.Stream as FS.Stream
import Node.Path (FilePath)
import Node.Stream.Object as O
import Pipes.Core (Consumer, Producer)
import Pipes.Node.Stream (fromReadable, fromWritable)
import Prim.Row (class Union)
-- | Creates a `fs.Writable` stream for the file
-- | at the given path.
-- |
-- | Writing `Nothing` to this pipe will close the stream.
-- |
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write
:: forall r trash
. Union r trash WriteStreamOptions
=> Record r -> FilePath -> Consumer (Maybe Buffer) Aff Unit
write o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w
-- | Open a file in write mode, failing if the file already exists.
-- |
-- | `write {flags: "wx"}`
create :: FilePath -> Consumer (Maybe Buffer) Aff Unit
create = write {flags: "wx"}
-- | Open a file in write mode, truncating it if the file already exists.
-- |
-- | `write {flags: "w"}`
truncate :: FilePath -> Consumer (Maybe Buffer) Aff Unit
truncate = write {flags: "w"}
-- | Open a file in write mode, appending written contents if the file already exists.
-- |
-- | `write {flags: "a"}`
append :: FilePath -> Consumer (Maybe Buffer) Aff Unit
append = write {flags: "a"}
-- | Creates a `fs.Readable` stream for the file at the given path.
-- |
-- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read :: FilePath -> Producer (Maybe Buffer) Aff Unit
read p = do
r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r

View File

@ -2,94 +2,134 @@ module Pipes.Node.Stream where
import Prelude import Prelude
import Control.Alternative (empty) import Control.Monad.Error.Class (throwError)
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust) import Control.Monad.Rec.Class (whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Maybe (Maybe(..)) import Data.Maybe (Maybe(..), maybe)
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.Traversable (for_)
import Effect.Aff (Aff, delay) import Effect.Aff (Aff, delay)
import Effect.Aff.Class (liftAff) import Effect.Aff.Class (liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield, (>->))
import Pipes.Core (Consumer, Pipe, Producer) import Pipes (for) as P
import Pipes.Internal (Proxy) import Pipes.Core (Consumer, Pipe, Producer, Producer_)
import Pipes.Internal as P.I import Pipes.Prelude (mapFoldable, map) as P
type ProxyFFI :: Type -> Type -> Type -> Type -> Type -> Type -> Type -- | Convert a `Readable` stream to a `Pipe`.
type ProxyFFI a' a b' b r pipe = -- |
{ pure :: r -> pipe -- | This will yield `Nothing` before exiting, signaling
, request :: a' -> (a -> pipe) -> pipe -- | End-of-stream.
, respond :: b -> (b' -> pipe) -> pipe fromReadable :: forall s a. O.Read s a => s -> Producer_ (Maybe a) Aff Unit
} fromReadable r =
let
proxyFFI :: forall m a' a b' b r. ProxyFFI a' a b' b r (Proxy a' a b' b m r) cleanup rmErrorListener = do
proxyFFI = { pure: P.I.Pure, request: P.I.Request, respond: P.I.Respond } liftEffect rmErrorListener
go {error, cancel} = do
fromReadable :: forall s a. O.Read s a => s -> Producer (Maybe a) Aff Unit
fromReadable r = whileJust do
liftAff $ delay $ wrap 0.0 liftAff $ delay $ wrap 0.0
a <- liftEffect $ O.read r err <- liftEffect $ liftST $ STRef.read error
case a of for_ err throwError
O.ReadWouldBlock -> do
lift $ O.awaitReadableOrClosed r
pure $ Just unit
O.ReadClosed -> do
yield Nothing
pure Nothing
O.ReadJust a' -> do
yield $ Just a'
pure $ Just unit
res <- liftEffect $ O.read r
case res of
O.ReadJust a -> yield (Just a) *> go {error, cancel}
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel}
O.ReadClosed -> yield Nothing *> cleanup cancel
in do
e <- liftEffect $ O.withErrorST r
go e
-- | Convert a `Writable` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit
fromWritable w = do fromWritable w =
whileJust $ runMaybeT do let
cleanup rmErrorListener = do
liftEffect rmErrorListener
liftEffect $ O.end w
go {error, cancel} = do
liftAff $ delay $ wrap 0.0 liftAff $ delay $ wrap 0.0
a <- MaybeT await err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
ma <- await
case ma of
Nothing -> cleanup cancel
Just a -> do
res <- liftEffect $ O.write w a res <- liftEffect $ O.write w a
case res of case res of
O.WriteClosed -> empty O.WriteOk -> go {error, cancel}
O.WriteOk -> pure unit O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel}
O.WriteWouldBlock -> do O.WriteClosed -> pure unit
liftAff $ O.awaitWritableOrClosed w in do
pure unit r <- liftEffect $ O.withErrorST w
liftEffect $ O.end w go r
-- | Convert a `Transform` stream to a `Pipe`.
-- |
-- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit
fromTransform t = fromTransform t =
let let
read' {exitOnWouldBlock} = cleanup removeErrorListener = do
whileJust $ runMaybeT do
liftAff $ delay $ wrap 0.0
res <- liftEffect $ O.read t
case res of
O.ReadWouldBlock ->
if exitOnWouldBlock then do
empty
else do
liftAff $ O.awaitReadableOrClosed t
pure unit
O.ReadJust b -> do
lift $ yield $ Just b
pure unit
O.ReadClosed -> do
lift $ yield Nothing
empty
in do
whileJust $ runMaybeT do
liftAff $ delay $ wrap 0.0
a <- MaybeT await
writeRes <- liftEffect $ O.write t a
lift $ read' {exitOnWouldBlock: true}
case writeRes of
O.WriteOk -> pure unit
O.WriteClosed -> empty
O.WriteWouldBlock -> do
liftAff $ O.awaitWritableOrClosed t
pure unit
liftEffect $ O.end t liftEffect $ O.end t
read' {exitOnWouldBlock: false} liftEffect $ removeErrorListener
fromReadable t
yieldFromReadableHalf = do
res <- liftEffect (O.read t)
case res of
O.ReadJust a -> yield (Just a)
O.ReadWouldBlock -> pure unit
O.ReadClosed -> yield Nothing *> pure unit
go {error, cancel} = do
liftAff $ delay $ wrap 0.0
err <- liftEffect $ liftST $ STRef.read error
for_ err throwError
ma <- await
case ma of
Nothing -> cleanup cancel
Just a' -> do
res <- liftEffect $ O.write t a'
yieldFromReadableHalf
case res of
O.WriteOk -> go {error, cancel}
O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel}
O.WriteClosed -> cleanup cancel
in do
r <- liftEffect $ O.withErrorST t
go r
-- | Given a `Producer` of values, wrap them in `Just`.
-- |
-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal.
withEOS :: forall a. Producer a Aff Unit -> Producer (Maybe a) Aff Unit
withEOS a = do
P.for a (yield <<< Just)
yield Nothing
-- | Strip a pipeline of the EOS signal
unEOS :: forall a. Pipe (Maybe a) a Aff Unit
unEOS = P.mapFoldable identity
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.
-- |
-- | Allows easily using pipes not concerned with the EOS signal with
-- | pipes that do need this signal.
-- |
-- | (ex. `Pipes.Node.Buffer.toString` doesn't need an EOS signal, but `Pipes.Node.FS.create` does.)
-- |
-- | `Just` values will be passed to the pipe, and the response(s) will be wrapped in `Just`.
-- |
-- | `Nothing` will bypass the given pipe entirely, and the pipe will not be invoked again.
inEOS :: forall a b. Pipe a b Aff Unit -> Pipe (Maybe a) (Maybe b) Aff Unit
inEOS p = whileJust do
ma <- await
maybe (yield Nothing) (\a -> yield a >-> p >-> P.map Just) ma
pure $ void ma

40
src/Pipes.Node.Zlib.purs Normal file
View File

@ -0,0 +1,40 @@
module Pipes.Node.Zlib where
import Prelude
import Data.Maybe (Maybe)
import Effect (Effect)
import Effect.Aff (Aff)
import Effect.Class (liftEffect)
import Node.Buffer (Buffer)
import Node.Stream.Object as O
import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe)
import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r. Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw
gzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gzip = fromZlib Zlib.createGzip
gunzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
gunzip = fromZlib Zlib.createGunzip
unzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
unzip = fromZlib Zlib.createUnzip
inflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
inflate = fromZlib Zlib.createInflate
deflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
deflate = fromZlib Zlib.createDeflate
brotliCompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress

43
src/Pipes.String.purs Normal file
View File

@ -0,0 +1,43 @@
module Pipes.String where
import Prelude
import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.Trans.Class (lift)
import Data.Array.ST as Array.ST
import Data.Foldable (fold, traverse_)
import Data.Maybe (Maybe(..))
import Data.String (Pattern)
import Data.String as String
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
-- | Accumulate string chunks until `pat` is seen, then `yield` the buffered
-- | string up to (and not including) the pattern.
-- |
-- | When end-of-stream is reached, yields the remaining buffered string then `Nothing`.
-- |
-- | ```
-- | toList $ yield "foo,bar,baz" >-> split ","
-- | -- "foo" : "bar" : "baz" : Nil
-- | ```
split :: forall m. MonadEffect m => Pattern -> Pipe (Maybe String) (Maybe String) m Unit
split pat = do
buf <- liftEffect $ liftST $ Array.ST.new
whileJust $ runMaybeT do
chunk <- MaybeT await
case String.indexOf pat chunk of
Nothing -> void $ liftEffect $ liftST $ Array.ST.push chunk buf
Just ix -> do
let
{before, after} = String.splitAt ix chunk
len <- liftEffect $ liftST $ Array.ST.length buf
buf' <- liftEffect $ liftST $ Array.ST.splice 0 len [] buf
lift $ yield $ Just $ (fold buf') <> before
void $ liftEffect $ liftST $ Array.ST.push (String.drop 1 after) buf
buf' <- liftEffect $ liftST $ Array.ST.unsafeFreeze buf
traverse_ yield (Just <$> String.split pat (fold buf'))
yield Nothing

32
src/Pipes.Util.purs Normal file
View File

@ -0,0 +1,32 @@
module Pipes.Util where
import Prelude
import Control.Monad.Rec.Class (whileJust)
import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef
import Data.Maybe (Maybe(..))
import Effect.Class (class MonadEffect, liftEffect)
import Pipes (await, yield)
import Pipes.Core (Pipe)
-- | Yields a separator value `sep` between received values
-- |
-- | ```purescript
-- | toList $ (yield "a" *> yield "b" *> yield "c") >-> intersperse ","
-- | -- "a" : "," : "b" : "," : "c" : Nil
-- | ```
intersperse :: forall m a. MonadEffect m => a -> Pipe (Maybe a) (Maybe a) m Unit
intersperse sep = do
isFirst <- liftEffect $ liftST $ STRef.new true
whileJust do
ma <- await
isFirst' <- liftEffect $ liftST $ STRef.read isFirst
case ma of
Just a
| isFirst' -> do
void $ liftEffect $ liftST $ STRef.write false isFirst
yield $ Just a
| otherwise -> yield (Just sep) *> yield (Just a)
Nothing -> yield Nothing
pure $ void ma

38
test/Test/Common.purs Normal file
View File

@ -0,0 +1,38 @@
module Test.Common where
import Prelude
import Control.Monad.Error.Class (class MonadError, liftEither, try)
import Data.Bifunctor (lmap)
import Data.String.Gen (genAlphaString)
import Data.Tuple (fst)
import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff (Aff, bracket)
import Effect.Class (liftEffect)
import Effect.Exception (Error, error)
import Node.FS.Sync as FS
import Pipes.Core (Pipe)
import Pipes.Prelude as Pipes
import Simple.JSON (class ReadForeign, class WriteForeign, readJSON, writeJSON)
import Test.QuickCheck.Gen (randomSampleOne, resize)
tmpFile :: (String -> Aff Unit) -> Aff Unit
tmpFile f = tmpFiles (f <<< fst)
tmpFiles :: (String /\ String -> Aff Unit) -> Aff Unit
tmpFiles =
let
acq = do
randa <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
randb <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
void $ try $ liftEffect $ FS.mkdir ".tmp"
pure $ (".tmp/tmp." <> randa) /\ (".tmp/tmp." <> randb)
rel (a /\ b) = liftEffect (try (FS.rm a) *> void (try $ FS.rm b))
in
bracket acq rel
jsonStringify :: forall m a. Monad m => WriteForeign a => Pipe a String m Unit
jsonStringify = Pipes.map writeJSON
jsonParse :: forall m @a. MonadError Error m => ReadForeign a => Pipe String a m Unit
jsonParse = Pipes.mapM (liftEither <<< lmap (error <<< show) <<< readJSON)

View File

@ -6,9 +6,13 @@ import Data.Maybe (Maybe(..))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (launchAff_) import Effect.Aff (launchAff_)
import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream import Test.Pipes.Node.Stream as Test.Pipes.Node.Stream
import Test.Spec.Reporter (consoleReporter, specReporter) import Test.Pipes.Node.Buffer as Test.Pipes.Node.Buffer
import Test.Pipes.Node.FS as Test.Pipes.Node.FS
import Test.Spec.Reporter (specReporter)
import Test.Spec.Runner (defaultConfig, runSpec') import Test.Spec.Runner (defaultConfig, runSpec')
main :: Effect Unit main :: Effect Unit
main = launchAff_ $ runSpec' (defaultConfig { timeout = Nothing }) [ specReporter ] do main = launchAff_ $ runSpec' (defaultConfig { failFast = true, timeout = Nothing }) [ specReporter ] do
Test.Pipes.Node.Stream.spec Test.Pipes.Node.Stream.spec
Test.Pipes.Node.Buffer.spec
Test.Pipes.Node.FS.spec

View File

@ -0,0 +1,69 @@
module Test.Pipes.Node.Buffer where
import Prelude
import Control.Monad.Error.Class (catchError)
import Control.Monad.Gen (chooseInt, sized)
import Data.Array as Array
import Data.FoldableWithIndex (forWithIndex_)
import Data.Int as Int
import Data.String.Gen (genAsciiString)
import Data.Tuple (fst, snd)
import Data.Tuple.Nested ((/\))
import Effect.Class (liftEffect)
import Effect.Unsafe (unsafePerformEffect)
import Node.Buffer (Buffer, BufferValueType(..))
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Pipes ((>->))
import Pipes (each) as Pipes
import Pipes.Core (runEffect) as Pipes
import Pipes.Node.Buffer as Pipes.Node.Buffer
import Pipes.Prelude (drain, toListM) as Pipes
import Test.QuickCheck (class Arbitrary)
import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample', vectorOf)
import Test.Spec (Spec, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
data BufferJunk = BufferJunk Buffer
instance Arbitrary BufferJunk where
arbitrary = sized \s -> do
ns <- vectorOf s (chooseInt 0 7)
pure $ unsafePerformEffect do
buf <- Buffer.alloc s
forWithIndex_ ns \ix n -> Buffer.write UInt8 (Int.toNumber n) ix buf
pure $ BufferJunk buf
data BufferUTF8 = BufferUTF8 String Buffer
instance Arbitrary BufferUTF8 where
arbitrary = do
s <- genAsciiString
pure $ BufferUTF8 s $ unsafePerformEffect $ Buffer.fromString s UTF8
spec :: Spec Unit
spec = describe "Pipes.Node.Buffer" do
describe "toString" do
it "fails when encoding wrong" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
let
uut = Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString UTF8 >-> Pipes.drain
ok = do
uut
fail "Should have thrown"
err _ = pure unit
catchError ok err
it "junk OK in hex" do
vals <- Pipes.each <$> (map \(BufferJunk b) -> b) <$> liftEffect (randomSample' 10 arbitrary)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.toString Hex >-> Pipes.drain
it "UTF8 ok" do
vals <- (map \(BufferUTF8 s b) -> s /\ b) <$> liftEffect (randomSample' 100 arbitrary)
let
bufs = Pipes.each $ snd <$> vals
strs = fst <$> vals
act <- Array.fromFoldable <$> Pipes.toListM (bufs >-> Pipes.Node.Buffer.toString UTF8)
act `shouldEqual` strs
describe "fromString" do
it "ok" do
vals <- Pipes.each <$> liftEffect (randomSample' 100 genAsciiString)
Pipes.runEffect $ vals >-> Pipes.Node.Buffer.fromString UTF8 >-> Pipes.drain

View File

@ -0,0 +1,86 @@
module Test.Pipes.Node.FS where
import Prelude
import Control.Monad.Error.Class (catchError)
import Data.Foldable (fold, intercalate)
import Data.Newtype (wrap)
import Data.Tuple.Nested ((/\))
import Effect.Class (liftEffect)
import Node.Encoding (Encoding(..))
import Node.FS.Sync as FS
import Pipes (yield, (>->))
import Pipes.Core (runEffect) as Pipes
import Pipes.Node.Buffer as Pipes.Node.Buffer
import Pipes.Node.FS as Pipes.Node.FS
import Pipes.Node.Stream (inEOS, unEOS, withEOS)
import Pipes.Prelude (drain, map, toListM) as Pipes
import Pipes.String as Pipes.String
import Pipes.Util as Pipes.Util
import Simple.JSON (writeJSON)
import Test.Common (jsonParse, tmpFile, tmpFiles)
import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (fail, shouldEqual)
spec :: Spec Unit
spec = describe "Pipes.Node.FS" do
describe "read" do
around tmpFile $ it "fails if the file does not exist" \p -> do
flip catchError (const $ pure unit) do
Pipes.runEffect $ Pipes.Node.FS.read p >-> Pipes.drain
fail "should have thrown"
around tmpFile $ it "reads ok" \p -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8)
s `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "create" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "fails if the file already exists" \p -> do
liftEffect $ FS.writeTextFile UTF8 "foo" p
flip catchError (const $ pure unit) do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p
fail "should have thrown"
describe "append" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "appends" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "\n" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo\nbar"
describe "truncate" do
around tmpFile $ it "creates the file when not exists" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "foo"
around tmpFile $ it "overwrites contents" \p -> do
Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p
contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "bar"
around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do
let
exp = [{foo: "a"}, {foo: "bar"}, {foo: "123"}]
liftEffect $ FS.writeTextFile UTF8 a $ intercalate "\n" $ writeJSON <$> exp
Pipes.runEffect $
Pipes.Node.FS.read a
>-> inEOS (Pipes.Node.Buffer.toString UTF8)
>-> Pipes.String.split (wrap "\n")
>-> inEOS (jsonParse @{foo :: String})
>-> inEOS (Pipes.map _.foo)
>-> Pipes.Util.intersperse "\n"
>-> inEOS (Pipes.Node.Buffer.fromString UTF8)
>-> Pipes.Node.FS.create b
act <- liftEffect $ FS.readTextFile UTF8 b
act `shouldEqual` "a\nbar\n123"

View File

@ -1,4 +1,19 @@
import Stream from 'stream' import Stream from 'stream'
export const discardTransform = () => new Stream.Transform({
transform: function(_ck, _enc, cb) {
cb()
},
objectMode: true
})
export const charsTransform = () => new Stream.Transform({
transform: function(ck, _enc, cb) {
ck.split('').filter(s => !!s).forEach(s => this.push(s))
cb()
},
objectMode: true,
})
/** @type {(a: Array<unknown>) => Stream.Readable}*/ /** @type {(a: Array<unknown>) => Stream.Readable}*/
export const readableFromArray = a => Stream.Readable.from(a) export const readableFromArray = a => Stream.Readable.from(a)

View File

@ -2,24 +2,18 @@ module Test.Pipes.Node.Stream where
import Prelude import Prelude
import Control.Monad.Error.Class (liftEither, try)
import Control.Monad.Morph (hoist)
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Data.Array as Array import Data.Array as Array
import Data.Bifunctor (lmap) import Data.Foldable (fold)
import Data.Foldable (fold, intercalate)
import Data.List ((:)) import Data.List ((:))
import Data.List as List import Data.List as List
import Data.Maybe (Maybe(..), fromMaybe) import Data.Maybe (Maybe)
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.String.Gen (genAlphaString) import Data.String.Gen (genAlphaString)
import Data.Traversable (for_, traverse)
import Data.Tuple (fst)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff, bracket, delay) import Effect.Aff (Aff, delay)
import Effect.Class (liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Effect.Exception (error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Buffer as Buffer import Node.Buffer as Buffer
import Node.Encoding (Encoding(..)) import Node.Encoding (Encoding(..))
@ -27,53 +21,30 @@ import Node.FS.Stream as FS.Stream
import Node.FS.Sync as FS import Node.FS.Sync as FS
import Node.Stream.Object as O import Node.Stream.Object as O
import Node.Zlib as Zlib import Node.Zlib as Zlib
import Pipes (each) as Pipes
import Pipes (yield, (>->)) import Pipes (yield, (>->))
import Pipes.Core (Consumer, Producer, Pipe, runEffect) import Pipes.Core (Consumer, Producer, runEffect)
import Pipes.Node.Buffer as Pipes.Buffer
import Pipes.Node.Stream as S import Pipes.Node.Stream as S
import Pipes.Prelude as Pipe import Pipes.Prelude (mapFoldable, toListM) as Pipes
import Simple.JSON (class ReadForeign, class WriteForeign, readJSON, writeJSON) import Simple.JSON (writeJSON)
import Test.Common (jsonParse, jsonStringify, tmpFile, tmpFiles)
import Test.QuickCheck.Arbitrary (arbitrary) import Test.QuickCheck.Arbitrary (arbitrary)
import Test.QuickCheck.Gen (randomSample', randomSampleOne, resize) import Test.QuickCheck.Gen (randomSample')
import Test.Spec (Spec, around, describe, it) import Test.Spec (Spec, around, describe, it)
import Test.Spec.Assertions (shouldEqual) import Test.Spec.Assertions (shouldEqual)
foreign import readableFromArray :: forall @a. Array a -> O.Readable a foreign import readableFromArray :: forall @a. Array a -> O.Readable a
foreign import discardTransform :: forall a b. Effect (O.Transform a b)
foreign import charsTransform :: Effect (O.Transform String String)
str2buf :: Pipe (Maybe String) (Maybe Buffer) Aff Unit writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit)
str2buf = hoist liftEffect $ Pipe.mapM (traverse $ flip Buffer.fromString UTF8) writer a = do
stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a
pure $ stream /\ S.fromWritable stream
buf2str :: Pipe (Maybe Buffer) (Maybe String) Aff Unit reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit)
buf2str = hoist liftEffect $ Pipe.mapM (traverse $ Buffer.toString UTF8) reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a
buf2hex :: Pipe (Maybe Buffer) (Maybe String) Aff Unit
buf2hex = hoist liftEffect $ Pipe.mapM (traverse $ Buffer.toString Hex)
jsonStringify :: forall a. WriteForeign a => Pipe (Maybe a) (Maybe String) Aff Unit
jsonStringify = Pipe.map (map writeJSON)
jsonParse :: forall @a. ReadForeign a => Pipe (Maybe String) (Maybe a) Aff Unit
jsonParse = Pipe.mapM (traverse (liftEither <<< lmap (error <<< show) <<< readJSON))
writer :: String -> Effect (Consumer (Maybe Buffer) Aff Unit)
writer a = S.fromWritable <$> O.fromBufferWritable <$> FS.Stream.createWriteStream a
reader :: String -> Effect (Producer (Maybe Buffer) Aff Unit)
reader a = S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a
tmpFile :: (String -> Aff Unit) -> Aff Unit
tmpFile f = tmpFiles (f <<< fst)
tmpFiles :: (String /\ String -> Aff Unit) -> Aff Unit
tmpFiles =
let
acq = do
randa <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
randb <- liftEffect $ randomSampleOne $ resize 10 genAlphaString
void $ try $ liftEffect $ FS.mkdir ".tmp"
pure $ ("tmp." <> randa) /\ ("tmp." <> randb)
rel (a /\ b) = liftEffect (try (FS.rm a) *> void (try $ FS.rm b))
in
bracket acq rel
spec :: Spec Unit spec :: Spec Unit
spec = spec =
@ -81,42 +52,42 @@ spec =
describe "Readable" do describe "Readable" do
describe "Readable.from(<Iterable>)" do describe "Readable.from(<Iterable>)" do
it "empty" do it "empty" do
vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray @{ foo :: String } []) vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } []) >-> S.unEOS
vals `shouldEqual` List.Nil vals `shouldEqual` List.Nil
it "singleton" do it "singleton" do
vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray @{ foo :: String } [ { foo: "1" } ]) >-> S.unEOS
vals `shouldEqual` ({ foo: "1" } : List.Nil) vals `shouldEqual` ({ foo: "1" } : List.Nil)
it "many elements" do it "many elements" do
let exp = (\n -> { foo: show n }) <$> Array.range 0 100 let exp = (\n -> { foo: show n }) <$> Array.range 0 100
vals <- List.catMaybes <$> (Pipe.toListM $ S.fromReadable $ readableFromArray exp) vals <- Pipes.toListM $ (S.fromReadable $ readableFromArray exp) >-> S.unEOS
vals `shouldEqual` (List.fromFoldable exp) vals `shouldEqual` (List.fromFoldable exp)
describe "Writable" $ around tmpFile do describe "Writable" $ around tmpFile do
describe "fs.WriteStream" do describe "fs.WriteStream" do
it "pipe to file" \p -> do it "pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
w = S.fromWritable stream
source = do source = do
buf <- liftEffect $ Buffer.fromString "hello" UTF8 buf <- liftEffect $ Buffer.fromString "hello" UTF8
yield $ Just buf yield buf
yield Nothing runEffect $ S.withEOS source >-> w
runEffect $ source >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello" contents `shouldEqual` "hello"
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
it "async pipe to file" \p -> do it "async pipe to file" \p -> do
w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p)
let let
source = do source = do
yield $ Just "hello, " yield "hello, "
lift $ delay $ wrap 5.0 lift $ delay $ wrap 5.0
yield $ Just "world!" yield "world!"
lift $ delay $ wrap 5.0 lift $ delay $ wrap 5.0
yield $ Just " " yield " "
lift $ delay $ wrap 5.0 lift $ delay $ wrap 5.0
yield $ Just "this is a " yield "this is a "
lift $ delay $ wrap 5.0 lift $ delay $ wrap 5.0
yield $ Just "test." yield "test."
yield Nothing runEffect $ S.withEOS (source >-> Pipes.Buffer.fromString UTF8) >-> w
runEffect $ source >-> str2buf >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` "hello, world! this is a test." contents `shouldEqual` "hello, world! this is a test."
it "chained pipes" \p -> do it "chained pipes" \p -> do
@ -126,33 +97,44 @@ spec =
num :: Int <- arbitrary num :: Int <- arbitrary
stuff :: Array String <- arbitrary stuff :: Array String <- arbitrary
pure {str, num, stuff} pure {str, num, stuff}
objs <- liftEffect $ randomSample' 1 obj objs <- liftEffect (randomSample' 1 obj)
let let
exp = fold (writeJSON <$> objs) exp = fold (writeJSON <$> objs)
objs' = for_ (Just <$> objs) yield *> yield Nothing stream /\ w <- liftEffect $ writer p
w <- liftEffect $ writer p runEffect $ S.withEOS (Pipes.each objs >-> jsonStringify >-> Pipes.Buffer.fromString UTF8) >-> w
runEffect $ objs' >-> jsonStringify >-> str2buf >-> w
contents <- liftEffect $ FS.readTextFile UTF8 p contents <- liftEffect $ FS.readTextFile UTF8 p
contents `shouldEqual` exp contents `shouldEqual` exp
shouldEqual true =<< liftEffect (O.isWritableEnded stream)
describe "Transform" do describe "Transform" do
it "gzip" do it "gzip" do
let let
json = do json = yield $ writeJSON {foo: "bar"}
yield $ Just $ writeJSON {foo: "bar"}
yield Nothing
exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000" exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000"
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
outs :: List.List String <- List.catMaybes <$> Pipe.toListM (json >-> str2buf >-> gzip >-> buf2hex) outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex)
fold outs `shouldEqual` exp fold outs `shouldEqual` exp
around tmpFiles around tmpFiles
$ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do $ it "file >-> gzip >-> file >-> gunzip" \(a /\ b) -> do
liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [1, 2, 3, 4] liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [1, 2, 3, 4]
areader <- liftEffect $ reader a areader <- liftEffect $ reader a
bwriter <- liftEffect $ writer b bwritestream /\ bwriter <- liftEffect $ writer b
gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip)
runEffect $ areader >-> gzip >-> bwriter runEffect $ areader >-> gzip >-> bwriter
shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream)
gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip)
breader <- liftEffect $ reader b breader <- liftEffect $ reader b
nums <- Pipe.toListM (breader >-> gunzip >-> buf2str >-> jsonParse @(Array Int) >-> Pipe.mapFoldable (fromMaybe [])) nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity)
Array.fromFoldable nums `shouldEqual` [1, 2, 3, 4] Array.fromFoldable nums `shouldEqual` [1, 2, 3, 4]
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p
discard' <- liftEffect discardTransform
out :: List.List Int <- Pipes.toListM $ r >-> S.fromTransform discard' >-> S.unEOS
out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p
chars' <- liftEffect charsTransform
out :: List.List String <- Pipes.toListM $ r >-> S.inEOS (Pipes.Buffer.toString UTF8) >-> S.fromTransform chars' >-> S.unEOS
out `shouldEqual` List.fromFoldable ["f", "o", "o", " ", "b", "a", "r"]