diff --git a/README.md b/README.md index 06856fc..3cafe8e 100644 --- a/README.md +++ b/README.md @@ -2,41 +2,114 @@ Interact with node streams in object mode using [`Pipes`]! -## Example -```purescript -import Prelude - -import Effect.Aff (launchAff_) -import Effect (Effect) -import Effect.Class (liftEffect) -import Effect.Console (log) -import Pipes.Prelude ((>->)) -import Pipes.Prelude as Pipes -import Pipes.Core as Pipes.Core -import Pipes.Node.FS.Stream as FS -import Pipes.Node.Zlib as Zlib -import Pipes.CSV.Parse as CSV.Parse - --- == my-zipped-data.csv == --- id,foo,is_deleted --- 1,hello,f --- 2,goodbye,t - --- Logs: --- {id: 1, foo: "hello", is_deleted: false} --- {id: 2, foo: "goodbye", is_deleted: true} -main :: Effect Unit -main = - Pipes.Core.runEffect - $ FS.createReadStream "my-zipped-data.csv.gz" - >-> Zlib.gunzip - >-> CSV.Parse.parse @{id :: Int, foo :: String, is_deleted :: Boolean} - >-> Pipes.mapM (liftEffect <<< log) -``` - -## Installing +## Install ```bash spago install node-stream-pipes ``` +## Usage +### Node Streams +#### Raw Streams +Raw `objectMode` Node streams are represented in `Node.Stream.Object`: + - `Writable a` accepts chunks of type `a` + - `Readable a` emits chunks of type `a` + - `Transform a b` transforms chunks from `a` to `b` + +Non-Object streams can also be represented with these types; for example an `fs.WriteStream` +can be coerced to `Writable Buffer` without issue. + +Interop between these types and `Node.Stream` are provided in `Node.Stream.Object`: +- `unsafeFrom{String,Buffer}{Writable,Readable,Transform}` +- `unsafeCoerce{Writable,Readable,Transform}` + +#### Pipes +Streams in `Node.Stream.Object` can be converted to `Producer`s, `Consumer`s and `Pipe`s with `Pipes.Node.Stream`: + - `fromReadable :: forall a. -> Producer (Maybe a) Unit` + - `fromWritable :: forall a. -> Consumer (Maybe a) Unit` + - `fromTransform :: forall a b. -> Pipe (Maybe a) (Maybe b) Unit` + +#### EOS Marker +Normally, pipe computations will not be executed once any computation in a pipeline exits. + +To allow for resource cleanup and awareness that the stream is about to close, +`Maybe a` is used occasionally in this package as an End-of-Stream marker: + +```purescript +-- foo.txt is "hello, world!\n" +chunks <- Pipes.Collect.toArray $ Pipes.FS.read "foo.txt" >-> Pipes.Node.Stream.inEOS (Pipes.Buffer.toString UTF8) +chunks `shouldEqual` [Just "hello, world!\n", Nothing] +``` + +Pipes from `a -> b` unaware of EOS can be lifted to `Maybe a -> Maybe b` with `Pipes.Node.Stream.inEOS`. + +Producers of `Maybe a` can drop the EOS marker and emit `a` with `Pipes.Node.Stream.unEOS`. + +Producers of `a` can have an EOS marker added with `Pipes.Node.Stream.withEOS`. + +#### Example +`Pipes.PassThrough.js` +```javascript +import {PassThrough} from 'stream' + +export const makePassThrough = () => new PassThrough() +``` + +`Pipes.PassThrough.purs` +```purescript +module Pipes.PassThrough where + +import Prelude + +import Effect (Effect) +import Effect.Class (liftEffect) +import Effect.Aff (Aff) +import Pipes.Core (Pipe) +import Node.Stream.Object as ObjectStream +import Pipes.Node.Stream as Pipes.Node.Stream + +type PassThroughStream a = ObjectStream.Transform a a + +foreign import makeRaw :: Effect PassThroughStream + +passThrough :: forall a. Pipe a a Aff Unit +passThrough = do + raw <- liftEffect $ makeRaw + Pipes.Node.Stream.fromTransform raw +``` + +### Utilities +This package provides utilities that explicitly use `MonadRec` to ensure stack-safety +when dealing with producers of large amounts of data. + +- `Pipes.Collect` provides stack-safe utilities for executing a pipeline and collecting results into a collection, `Buffer`, `Monoid` etc. +- `Pipes.Construct` provides stack-safe utilities for creating producers from in-memory collections. +- `Pipes.Util` provides some miscellaneous utilities missing from `pipes`. + +### Zlib +Pipes for compression & decompression using `zlib` are provided in `Pipes.Node.Zlib`. + +### FS +Read files with: +- `Pipes.Node.FS.read ` +- `Pipes.Node.FS.read' ` + +```purescript +Pipes.Collect.toStringWith UTF8 $ Pipes.Node.FS.read "foo.txt" >-> Pipes.Stream.unEOS +``` + +Write files with: + - `Pipes.Node.FS.write' ` + - `Pipes.Node.FS.trunc ` + - `Pipes.Node.FS.create ` + - `Pipes.Node.FS.append ` + +```purescript + Pipes.Stream.withEOS ( + Pipes.Construct.eachArray ["id,name", "1,henry", "2,suzie"] + >-> Pipes.Util.intersperse "\n" + >-> Pipes.Buffer.fromString UTF8 + ) + >-> Pipes.Node.FS.create "foo.csv" +``` + [`Pipes`]: https://pursuit.purescript.org/packages/purescript-pipes/8.0.0 diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 55fed8b..578d4df 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -34,6 +34,10 @@ derive instance Eq a => Eq (ReadResult a) instance Show (ReadResult a) where show = genericShow <<< map (const "..") +maybeReadResult :: forall a. ReadResult a -> Maybe a +maybeReadResult (ReadWouldBlock) = Nothing +maybeReadResult (ReadJust a) = Just a + data WriteResult = WriteWouldBlock | WriteOk @@ -134,23 +138,32 @@ withErrorST s = do cancel <- flip (Event.once errorH) s \e -> void $ liftST $ STRef.write (Just e) error pure { error, cancel } -fromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer -fromBufferReadable = unsafeCoerce +unsafeCoerceWritable :: forall r a. Stream.Writable r -> Writable a +unsafeCoerceWritable = unsafeCoerce -fromBufferTransform :: Stream.Duplex -> Transform Buffer Buffer -fromBufferTransform = unsafeCoerce +unsafeCoerceReadable :: forall r a. Stream.Readable r -> Readable a +unsafeCoerceReadable = unsafeCoerce -fromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer -fromBufferWritable = unsafeCoerce +unsafeCoerceTransform :: forall a b. Stream.Duplex -> Transform a b +unsafeCoerceTransform = unsafeCoerce -fromStringReadable :: forall r. Stream.Readable r -> Readable String -fromStringReadable = unsafeCoerce +unsafeFromBufferReadable :: forall r. Stream.Readable r -> Readable Buffer +unsafeFromBufferReadable = unsafeCoerce -fromStringTransform :: Stream.Duplex -> Transform String String -fromStringTransform = unsafeCoerce +unsafeFromBufferTransform :: forall a. Stream.Duplex -> Transform Buffer a +unsafeFromBufferTransform = unsafeCoerce -fromStringWritable :: forall r. Stream.Writable r -> Writable String -fromStringWritable = unsafeCoerce +unsafeFromBufferWritable :: forall r. Stream.Writable r -> Writable Buffer +unsafeFromBufferWritable = unsafeCoerce + +unsafeFromStringReadable :: forall r. Stream.Readable r -> Readable String +unsafeFromStringReadable = unsafeCoerce + +unsafeFromStringTransform :: forall a. Stream.Duplex -> Transform String a +unsafeFromStringTransform = unsafeCoerce + +unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String +unsafeFromStringWritable = unsafeCoerce awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit awaitReadableOrClosed s = do diff --git a/src/Pipes.Collect.purs b/src/Pipes.Collect.purs index b45ec93..9d52f5d 100644 --- a/src/Pipes.Collect.purs +++ b/src/Pipes.Collect.purs @@ -20,6 +20,7 @@ import Foreign.Object.ST as Object.ST import Foreign.Object.ST.Unsafe as Object.ST.Unsafe import Node.Buffer (Buffer) import Node.Buffer as Buffer +import Node.Encoding (Encoding) import Pipes.Core (Producer) import Pipes.Internal (Proxy(..)) @@ -51,7 +52,16 @@ fold f b0 p0 = traverse (\b a -> pure $ f b a) b0 p0 foreach :: forall a m. MonadRec m => (a -> m Unit) -> Producer a m Unit -> m Unit foreach f p0 = traverse (\_ a -> f a) unit p0 --- | Concatenate all produced buffers +-- | `append` all emitted values to `mempty` +toMonoid :: forall a m. Monoid a => MonadRec m => MonadEffect m => Producer a m Unit -> m a +toMonoid = fold (<>) mempty + +-- | Concatenate all buffers to a single buffer, then decode with the +-- | provided encoding. +toStringWith :: forall m. MonadRec m => MonadEffect m => Encoding -> Producer Buffer m Unit -> m String +toStringWith enc = (liftEffect <<< Buffer.toString enc) <=< toBuffer + +-- | Concatenate all produced buffers to a single buffer toBuffer :: forall m. MonadRec m => MonadEffect m => Producer Buffer m Unit -> m Buffer toBuffer p = (liftEffect <<< maybe (Buffer.alloc 0) pure) @@ -74,9 +84,18 @@ toArray p = do liftEffect $ liftST $ Array.ST.unsafeFreeze st -- | Collect all values from a `Producer` into a list. +-- | +-- | Reverses the list after collecting, so that values will be +-- | in the order they were emitted. toList :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a) toList = map List.reverse <<< fold (flip List.Cons) List.Nil +-- | Collect all values from a `Producer` into a list. +-- | +-- | Does not reverse the list after collecting. +toListRev :: forall a m. MonadRec m => MonadEffect m => Producer a m Unit -> m (List a) +toListRev = map List.reverse <<< fold (flip List.Cons) List.Nil + -- | Collect all values from a `Producer` into a Javascript Object. toObject :: forall a m. MonadRec m => MonadEffect m => Producer (String /\ a) m Unit -> m (Object a) toObject p = do diff --git a/src/Pipes.Node.FS.purs b/src/Pipes.Node.FS.purs index 7e1ffec..ecd48f6 100644 --- a/src/Pipes.Node.FS.purs +++ b/src/Pipes.Node.FS.purs @@ -23,7 +23,7 @@ import Prim.Row (class Union) -- | -- | See `Pipes.Node.Stream.withEOS` for converting `Producer a` -- | into `Producer (Maybe a)`, emitting `Nothing` before exiting. -write +write' :: forall r trash m . Union r trash WriteStreamOptions => MonadAff m @@ -31,27 +31,27 @@ write => Record r -> FilePath -> Consumer (Maybe Buffer) m Unit -write o p = do +write' o p = do w <- liftEffect $ FS.Stream.createWriteStream' p o - fromWritable $ O.fromBufferWritable w + fromWritable $ O.unsafeCoerceWritable w -- | Open a file in write mode, failing if the file already exists. -- | --- | `write {flags: "wx"}` +-- | `write' {flags: "wx"}` create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit -create = write { flags: "wx" } +create = write' { flags: "wx" } -- | Open a file in write mode, truncating it if the file already exists. -- | --- | `write {flags: "w"}` -truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit -truncate = write { flags: "w" } +-- | `write' {flags: "w"}` +trunc :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit +trunc = write' { flags: "w" } -- | Open a file in write mode, appending written contents if the file already exists. -- | --- | `write {flags: "a"}` +-- | `write' {flags: "a"}` append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit -append = write { flags: "a" } +append = write' { flags: "a" } -- | Creates a `fs.Readable` stream for the file at the given path. -- | @@ -60,7 +60,7 @@ append = write { flags: "a" } read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit read p = do r <- liftEffect $ FS.Stream.createReadStream p - fromReadable $ O.fromBufferReadable r + fromReadable $ O.unsafeCoerceReadable r -- | Creates a `fs.Readable` stream for the file at the given path. -- | @@ -76,4 +76,4 @@ read' -> Producer (Maybe Buffer) m Unit read' opts p = do r <- liftEffect $ FS.Stream.createReadStream' p opts - fromReadable $ O.fromBufferReadable r + fromReadable $ O.unsafeCoerceReadable r diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index e690026..4d089c3 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -3,13 +3,13 @@ module Pipes.Node.Stream where import Prelude hiding (join) import Control.Monad.Error.Class (class MonadThrow, throwError) -import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) +import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust) import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) -import Data.Maybe (Maybe(..)) +import Data.Maybe (Maybe(..), maybe) import Data.Newtype (wrap) -import Data.Traversable (for_) +import Data.Traversable (for_, traverse, traverse_) import Data.Tuple.Nested ((/\)) import Effect.Aff (delay) import Effect.Aff.Class (class MonadAff, liftAff) @@ -57,90 +57,74 @@ fromReadable r = -- | When `Nothing` is piped to this, the stream will -- | be `end`ed, and the pipe will noop if invoked again. fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit -fromWritable w = +fromWritable w = do + { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST w + let - cleanup rmErrorListener = do - liftEffect rmErrorListener - liftEffect $ O.end w + maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST) + + waitCanWrite = do + shouldWait <- liftEffect $ O.needsDrain w + when shouldWait $ liftAff $ O.awaitWritableOrClosed w + + cleanup = do liftAff $ O.awaitFinished w - pure $ Done unit + maybeThrow + liftEffect removeErrorListener - go { error, cancel } = do - err <- liftEffect $ liftST $ STRef.read error - for_ err throwError - - needsDrain <- liftEffect $ O.needsDrain w - when needsDrain $ liftAff $ O.awaitWritableOrClosed w + onEOS = liftEffect (O.end w) *> cleanup $> Done unit + onChunk a = liftEffect (O.write w a) $> Loop unit + go _ = do + maybeThrow + waitCanWrite ended <- liftEffect $ O.isWritableEnded w if ended then - cleanup cancel + cleanup $> Done unit else - await >>= case _ of - Nothing -> cleanup cancel - Just a -> do - void $ liftEffect $ O.write w a - pure $ Loop { error, cancel } - in - do - r <- liftEffect $ O.withErrorST w - tailRecM go r + await >>= maybe onEOS onChunk + + tailRecM go unit -- | Convert a `Transform` stream to a `Pipe`. -- | -- | When `Nothing` is piped to this, the `Transform` stream will -- | be `end`ed, and the pipe will noop if invoked again. fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit -fromTransform t = +fromTransform t = do + { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST t let - cleanup removeErrorListener = do - liftEffect $ O.end t + maybeThrow = traverse_ throwError =<< liftEffect (liftST $ STRef.read errorST) + + cleanup = do liftAff $ O.awaitFinished t fromReadable t + maybeThrow liftEffect $ removeErrorListener - pure $ Done unit - yieldWhileReadable = do - flip tailRecM unit \_ -> do - res <- liftEffect $ O.read t - case res of - O.ReadJust a -> yield (Just a) $> Loop unit - _ -> pure $ Done unit + yieldWhileReadable = void $ whileJust $ maybeYield1 - maybeYield1 = do - res <- liftEffect $ O.read t - case res of - O.ReadJust a -> yield $ Just a - _ -> pure unit + maybeYield1 = traverse (\a -> yield (Just a) $> Just unit) =<< O.maybeReadResult <$> liftEffect (O.read t) - go { error, cancel } = do - err <- liftEffect $ liftST $ STRef.read error - for_ err throwError + onEOS = liftEffect (O.end t) *> cleanup $> Done unit + onChunk a = + liftEffect (O.write t a) + >>= case _ of + O.WriteOk -> maybeYield1 $> Loop unit + O.WriteWouldBlock -> yieldWhileReadable $> Loop unit + go _ = do + maybeThrow needsDrain <- liftEffect $ O.needsDrain t ended <- liftEffect $ O.isWritableEnded t - if needsDrain then do - liftAff $ delay $ wrap 0.0 - yieldWhileReadable - pure $ Loop { error, cancel } + if needsDrain then + liftAff (delay $ wrap 0.0) *> yieldWhileReadable $> Loop unit else if ended then - cleanup cancel + cleanup $> Done unit else - await >>= case _ of - Nothing -> cleanup cancel - Just a' -> do - res <- liftEffect $ O.write t a' - case res of - O.WriteOk -> do - maybeYield1 - pure $ Loop { error, cancel } - O.WriteWouldBlock -> do - yieldWhileReadable - pure $ Loop { error, cancel } - in - do - r <- liftEffect $ O.withErrorST t - tailRecM go r + await >>= maybe onEOS onChunk + + tailRecM go unit -- | Given a `Producer` of values, wrap them in `Just`. -- | diff --git a/src/Pipes.Node.Zlib.purs b/src/Pipes.Node.Zlib.purs index ed41b78..2be7955 100644 --- a/src/Pipes.Node.Zlib.purs +++ b/src/Pipes.Node.Zlib.purs @@ -18,7 +18,7 @@ import Pipes.Node.Stream (fromTransform) fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit fromZlib z = do raw <- liftEffect $ Zlib.toDuplex <$> z - fromTransform $ O.fromBufferTransform raw + fromTransform $ O.unsafeCoerceTransform raw gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit gzip = fromZlib Zlib.createGzip diff --git a/test/Test/Pipes.Node.FS.purs b/test/Test/Pipes.Node.FS.purs index 998df76..61d8642 100644 --- a/test/Test/Pipes.Node.FS.purs +++ b/test/Test/Pipes.Node.FS.purs @@ -33,11 +33,6 @@ spec = describe "Pipes.Node.FS" do liftEffect $ FS.writeTextFile UTF8 p "foo" s <- fold <$> Pipes.toListM (Pipes.Node.FS.read p >-> unEOS >-> Pipes.Node.Buffer.toString UTF8) s `shouldEqual` "foo" - around tmpFile $ it "fails if the file already exists" \p -> do - liftEffect $ FS.writeTextFile UTF8 p "foo" - flip catchError (const $ pure unit) do - Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p - fail "should have thrown" describe "create" do around tmpFile $ it "creates the file when not exists" \p -> do Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.create p @@ -59,14 +54,14 @@ spec = describe "Pipes.Node.FS" do Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.append p contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` "foo\nbar" - describe "truncate" do + describe "trunc" do around tmpFile $ it "creates the file when not exists" \p -> do - Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` "foo" around tmpFile $ it "overwrites contents" \p -> do - Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p - Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.truncate p + Pipes.runEffect $ withEOS (yield "foo" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p + Pipes.runEffect $ withEOS (yield "bar" >-> Pipes.Node.Buffer.fromString UTF8) >-> Pipes.Node.FS.trunc p contents <- liftEffect $ FS.readTextFile UTF8 p contents `shouldEqual` "bar" around tmpFiles $ it "json lines >-> parse >-> _.foo >-> write" \(a /\ b) -> do diff --git a/test/Test/Pipes.Node.Stream.purs b/test/Test/Pipes.Node.Stream.purs index cd8f1b2..bf992b0 100644 --- a/test/Test/Pipes.Node.Stream.purs +++ b/test/Test/Pipes.Node.Stream.purs @@ -40,11 +40,11 @@ foreign import charsTransform :: Effect (O.Transform String String) writer :: forall m. MonadEffect m => String -> m (O.Writable Buffer /\ Consumer (Maybe Buffer) Aff Unit) writer a = do - stream <- liftEffect $ O.fromBufferWritable <$> FS.Stream.createWriteStream a + stream <- liftEffect $ O.unsafeCoerceWritable <$> FS.Stream.createWriteStream a pure $ stream /\ S.fromWritable stream reader :: forall m. MonadEffect m => String -> m (Producer (Maybe Buffer) Aff Unit) -reader a = liftEffect $ S.fromReadable <$> O.fromBufferReadable <$> FS.Stream.createReadStream a +reader a = liftEffect $ S.fromReadable <$> O.unsafeCoerceReadable <$> FS.Stream.createReadStream a spec :: Spec Unit spec = @@ -64,7 +64,7 @@ spec = describe "Writable" $ around tmpFile do describe "fs.WriteStream" do it "pipe to file" \p -> do - stream <- O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) + stream <- O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p) let w = S.fromWritable stream source = do @@ -75,7 +75,7 @@ spec = contents `shouldEqual` "hello" shouldEqual true =<< liftEffect (O.isWritableEnded stream) it "async pipe to file" \p -> do - w <- S.fromWritable <$> O.fromBufferWritable <$> liftEffect (FS.Stream.createWriteStream p) + w <- S.fromWritable <$> O.unsafeCoerceWritable <$> liftEffect (FS.Stream.createWriteStream p) let source = do yield "hello, " @@ -110,7 +110,7 @@ spec = let json = yield $ writeJSON { foo: "bar" } exp = "1f8b0800000000000003ab564acbcf57b2524a4a2c52aa0500eff52bfe0d000000" - gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) + gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) outs :: List.List String <- Pipes.toListM (S.withEOS (json >-> Pipes.Buffer.fromString UTF8) >-> gzip >-> S.unEOS >-> Pipes.Buffer.toString Hex) fold outs `shouldEqual` exp around tmpFiles @@ -118,11 +118,11 @@ spec = liftEffect $ FS.writeTextFile UTF8 a $ writeJSON [ 1, 2, 3, 4 ] areader <- liftEffect $ reader a bwritestream /\ bwriter <- liftEffect $ writer b - gzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) + gzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGzip) runEffect $ areader >-> gzip >-> bwriter shouldEqual true =<< liftEffect (O.isWritableEnded bwritestream) - gunzip <- S.fromTransform <$> O.fromBufferTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) + gunzip <- S.fromTransform <$> O.unsafeCoerceTransform <$> liftEffect (Zlib.toDuplex <$> Zlib.createGunzip) breader <- liftEffect $ reader b nums <- Pipes.toListM (breader >-> gunzip >-> S.unEOS >-> Pipes.Buffer.toString UTF8 >-> jsonParse @(Array Int) >-> Pipes.mapFoldable identity) Array.fromFoldable nums `shouldEqual` [ 1, 2, 3, 4 ]