fix: generalize all Affs to MonadAffs

This commit is contained in:
orion 2024-05-11 22:08:02 -05:00
parent 2fdf6f0dad
commit 407491f055
Signed by: orion
GPG Key ID: 6D4165AE4C928719
3 changed files with 33 additions and 26 deletions

View File

@ -2,9 +2,11 @@ module Pipes.Node.FS where
import Prelude import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe) import Data.Maybe (Maybe)
import Effect.Aff (Aff) import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.FS.Stream (WriteStreamOptions) import Node.FS.Stream (WriteStreamOptions)
import Node.FS.Stream as FS.Stream import Node.FS.Stream as FS.Stream
@ -22,11 +24,13 @@ import Prim.Row (class Union)
-- | See `Pipes.Node.Stream.withEOS` for converting `Producer a` -- | See `Pipes.Node.Stream.withEOS` for converting `Producer a`
-- | into `Producer (Maybe a)`, emitting `Nothing` before exiting. -- | into `Producer (Maybe a)`, emitting `Nothing` before exiting.
write write
:: forall r trash :: forall r trash m
. Union r trash WriteStreamOptions . Union r trash WriteStreamOptions
=> MonadAff m
=> MonadThrow Error m
=> Record r => Record r
-> FilePath -> FilePath
-> Consumer (Maybe Buffer) Aff Unit -> Consumer (Maybe Buffer) m Unit
write o p = do write o p = do
w <- liftEffect $ FS.Stream.createWriteStream' p o w <- liftEffect $ FS.Stream.createWriteStream' p o
fromWritable $ O.fromBufferWritable w fromWritable $ O.fromBufferWritable w
@ -34,26 +38,26 @@ write o p = do
-- | Open a file in write mode, failing if the file already exists. -- | Open a file in write mode, failing if the file already exists.
-- | -- |
-- | `write {flags: "wx"}` -- | `write {flags: "wx"}`
create :: FilePath -> Consumer (Maybe Buffer) Aff Unit create :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
create = write { flags: "wx" } create = write { flags: "wx" }
-- | Open a file in write mode, truncating it if the file already exists. -- | Open a file in write mode, truncating it if the file already exists.
-- | -- |
-- | `write {flags: "w"}` -- | `write {flags: "w"}`
truncate :: FilePath -> Consumer (Maybe Buffer) Aff Unit truncate :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
truncate = write { flags: "w" } truncate = write { flags: "w" }
-- | Open a file in write mode, appending written contents if the file already exists. -- | Open a file in write mode, appending written contents if the file already exists.
-- | -- |
-- | `write {flags: "a"}` -- | `write {flags: "a"}`
append :: FilePath -> Consumer (Maybe Buffer) Aff Unit append :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Consumer (Maybe Buffer) m Unit
append = write { flags: "a" } append = write { flags: "a" }
-- | Creates a `fs.Readable` stream for the file at the given path. -- | Creates a `fs.Readable` stream for the file at the given path.
-- | -- |
-- | Emits `Nothing` before closing. To opt out of this behavior, -- | Emits `Nothing` before closing. To opt out of this behavior,
-- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`. -- | use `Pipes.Node.Stream.withoutEOS` or `Pipes.Node.Stream.unEOS`.
read :: FilePath -> Producer (Maybe Buffer) Aff Unit read :: forall m. MonadAff m => MonadThrow Error m => FilePath -> Producer (Maybe Buffer) m Unit
read p = do read p = do
r <- liftEffect $ FS.Stream.createReadStream p r <- liftEffect $ FS.Stream.createReadStream p
fromReadable $ O.fromBufferReadable r fromReadable $ O.fromBufferReadable r

View File

@ -2,7 +2,7 @@ module Pipes.Node.Stream where
import Prelude import Prelude
import Control.Monad.Error.Class (throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
@ -11,9 +11,10 @@ import Data.Maybe (Maybe(..))
import Data.Newtype (wrap) import Data.Newtype (wrap)
import Data.Traversable (for_) import Data.Traversable (for_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect.Aff (Aff, delay) import Effect.Aff (delay)
import Effect.Aff.Class (liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
@ -25,7 +26,7 @@ import Pipes.Util (InvokeResult(..), invoke)
-- | -- |
-- | This will yield `Nothing` before exiting, signaling -- | This will yield `Nothing` before exiting, signaling
-- | End-of-stream. -- | End-of-stream.
fromReadable :: forall s a. O.Read s a => s -> Producer_ (Maybe a) Aff Unit fromReadable :: forall s a m. MonadThrow Error m => MonadAff m => O.Read s a => s -> Producer_ (Maybe a) m Unit
fromReadable r = fromReadable r =
let let
cleanup rmErrorListener = do cleanup rmErrorListener = do
@ -40,7 +41,7 @@ fromReadable r =
res <- liftEffect $ O.read r res <- liftEffect $ O.read r
case res of case res of
O.ReadJust a -> yield (Just a) $> Loop { error, cancel } O.ReadJust a -> yield (Just a) $> Loop { error, cancel }
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop { error, cancel } O.ReadWouldBlock -> liftAff (O.awaitReadableOrClosed r) $> Loop { error, cancel }
O.ReadClosed -> yield Nothing *> cleanup cancel O.ReadClosed -> yield Nothing *> cleanup cancel
in in
do do
@ -51,7 +52,7 @@ fromReadable r =
-- | -- |
-- | When `Nothing` is piped to this, the stream will -- | When `Nothing` is piped to this, the stream will
-- | be `end`ed, and the pipe will noop if invoked again. -- | be `end`ed, and the pipe will noop if invoked again.
fromWritable :: forall s a. O.Write s a => s -> Consumer (Maybe a) Aff Unit fromWritable :: forall s a m. MonadThrow Error m => MonadAff m => O.Write s a => s -> Consumer (Maybe a) m Unit
fromWritable w = fromWritable w =
let let
cleanup rmErrorListener = do cleanup rmErrorListener = do
@ -84,7 +85,7 @@ fromWritable w =
-- | -- |
-- | When `Nothing` is piped to this, the `Transform` stream will -- | When `Nothing` is piped to this, the `Transform` stream will
-- | be `end`ed, and the pipe will noop if invoked again. -- | be `end`ed, and the pipe will noop if invoked again.
fromTransform :: forall a b. O.Transform a b -> Pipe (Maybe a) (Maybe b) Aff Unit fromTransform :: forall a b m. MonadThrow Error m => MonadAff m => O.Transform a b -> Pipe (Maybe a) (Maybe b) m Unit
fromTransform t = fromTransform t =
let let
cleanup removeErrorListener = do cleanup removeErrorListener = do
@ -113,7 +114,7 @@ fromTransform t =
O.WriteClosed -> cleanup cancel O.WriteClosed -> cleanup cancel
O.WriteOk -> pure $ Loop { error, cancel } O.WriteOk -> pure $ Loop { error, cancel }
O.WriteWouldBlock -> do O.WriteWouldBlock -> do
lift (O.awaitWritableOrClosed t) liftAff $ O.awaitWritableOrClosed t
pure $ Loop { error, cancel } pure $ Loop { error, cancel }
in in
do do
@ -123,13 +124,13 @@ fromTransform t =
-- | Given a `Producer` of values, wrap them in `Just`. -- | Given a `Producer` of values, wrap them in `Just`.
-- | -- |
-- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal. -- | Before the `Producer` exits, emits `Nothing` as an End-of-stream signal.
withEOS :: forall a. Producer a Aff Unit -> Producer (Maybe a) Aff Unit withEOS :: forall a m. Monad m => Producer a m Unit -> Producer (Maybe a) m Unit
withEOS a = do withEOS a = do
P.for a (yield <<< Just) P.for a (yield <<< Just)
yield Nothing yield Nothing
-- | Strip a pipeline of the EOS signal -- | Strip a pipeline of the EOS signal
unEOS :: forall a. Pipe (Maybe a) a Aff Unit unEOS :: forall a m. Monad m => Pipe (Maybe a) a m Unit
unEOS = P.mapFoldable identity unEOS = P.mapFoldable identity
-- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`. -- | Lift a `Pipe a a` to `Pipe (Maybe a) (Maybe a)`.

View File

@ -2,10 +2,12 @@ module Pipes.Node.Zlib where
import Prelude import Prelude
import Control.Monad.Error.Class (class MonadThrow)
import Data.Maybe (Maybe) import Data.Maybe (Maybe)
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff) import Effect.Aff.Class (class MonadAff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error)
import Node.Buffer (Buffer) import Node.Buffer (Buffer)
import Node.Stream.Object as O import Node.Stream.Object as O
import Node.Zlib as Zlib import Node.Zlib as Zlib
@ -13,28 +15,28 @@ import Node.Zlib.Types (ZlibStream)
import Pipes.Core (Pipe) import Pipes.Core (Pipe)
import Pipes.Node.Stream (fromTransform) import Pipes.Node.Stream (fromTransform)
fromZlib :: forall r. Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> Pipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib z = do fromZlib z = do
raw <- liftEffect $ Zlib.toDuplex <$> z raw <- liftEffect $ Zlib.toDuplex <$> z
fromTransform $ O.fromBufferTransform raw fromTransform $ O.fromBufferTransform raw
gzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit gzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip
gunzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit gunzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip gunzip = fromZlib Zlib.createGunzip
unzip :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit unzip :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip unzip = fromZlib Zlib.createUnzip
inflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit inflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate inflate = fromZlib Zlib.createInflate
deflate :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit deflate :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate deflate = fromZlib Zlib.createDeflate
brotliCompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit brotliCompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: Pipe (Maybe Buffer) (Maybe Buffer) Aff Unit brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => Pipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress brotliDecompress = fromZlib Zlib.createBrotliDecompress