From 08bd9a817a7ca6dd97205159ecaf4020c006fbf4 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Sat, 22 Jun 2024 19:12:31 -0500 Subject: [PATCH] fix: asyncpipe is profunctor --- spago.lock | 1 + spago.yaml | 1 + src/Pipes.Async.purs | 22 ++++++++++++++++++---- src/Pipes.Node.Stream.purs | 2 +- src/Pipes.Node.Zlib.purs | 16 ++++++++-------- 5 files changed, 29 insertions(+), 13 deletions(-) diff --git a/spago.lock b/spago.lock index d985448..1c6c984 100644 --- a/spago.lock +++ b/spago.lock @@ -29,6 +29,7 @@ workspace: - parallel: ">=6.0.0 <7.0.0" - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" + - profunctor - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" diff --git a/spago.yaml b/spago.yaml index 45f8ba9..336c93a 100644 --- a/spago.yaml +++ b/spago.yaml @@ -36,6 +36,7 @@ package: - parallel: ">=6.0.0 <7.0.0" - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" + - profunctor: ">=6.0.1 <7.0.0" - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" diff --git a/src/Pipes.Async.purs b/src/Pipes.Async.purs index f341de8..14ee46a 100644 --- a/src/Pipes.Async.purs +++ b/src/Pipes.Async.purs @@ -21,6 +21,7 @@ import Data.Either (Either(..), either) import Data.Generic.Rep (class Generic) import Data.Maybe (Maybe(..), isNothing) import Data.Newtype (unwrap) +import Data.Profunctor (class Profunctor) import Data.Show.Generic (genericShow) import Data.Time.Duration (Milliseconds) import Data.Traversable (traverse_) @@ -92,7 +93,7 @@ instance Show a => Show (ReadResult a) where show = genericShow -- | - Attempt to read a chunk -- | - `x -> m ReadSignal` -- | - Block until the pipe is readable again (or reading must stop) -data AsyncPipe x a b m = +data AsyncPipe x m a b = AsyncPipe (m x) (x -> a -> m WriteResult) @@ -100,9 +101,22 @@ data AsyncPipe x a b m = (x -> m (ReadResult b)) (x -> m ReadSignal) +instance Monad m => Functor (AsyncPipe x m a) where + map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar + +instance Monad m => Profunctor (AsyncPipe x m) where + dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d + dimap ab cd (AsyncPipe init w aw r ar) = + AsyncPipe + init + (\x -> w x <<< ab) + aw + (map (map cd) <<< r) + ar + -- | Wraps all fields of an `AsyncPipe` with logging to debug -- | behavior and timing. -debug :: forall x a b m. MonadAff m => String -> AsyncPipe x a b m -> AsyncPipe x a b m +debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m a b -> AsyncPipe x m a b debug c (AsyncPipe init write awaitWrite read awaitRead) = let logL m = liftEffect $ log $ "[" <> c <> "] " <> m @@ -188,7 +202,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) = -- | * `read` will pass chunks to `parse` as fast as `parse` allows -- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready -- | * `encode` will encode chunks and yield them to `write` as soon as they're ready -sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x (Maybe a) (Maybe b) m -> Pipe (Maybe a) (Maybe b) m Unit +sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit sync (AsyncPipe init write awaitWrite read awaitRead) = let liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r @@ -243,7 +257,7 @@ pipeAsync => MonadAff m => MonadBracket Error f m => Producer (Maybe a) m Unit - -> AsyncPipe x (Maybe a) (Maybe b) m + -> AsyncPipe x m (Maybe a) (Maybe b) -> Producer (Maybe b) m Unit pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) = do diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index fe93550..5a2d4fc 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -113,7 +113,7 @@ fromTransform . MonadThrow Error m => MonadAff m => Effect (O.Transform a b) - -> AsyncPipe (TransformContext a b) (Maybe a) (Maybe b) m + -> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b) fromTransform t = let init = do diff --git a/src/Pipes.Node.Zlib.purs b/src/Pipes.Node.Zlib.purs index e904755..52eed7f 100644 --- a/src/Pipes.Node.Zlib.purs +++ b/src/Pipes.Node.Zlib.purs @@ -17,29 +17,29 @@ import Pipes.Node.Stream (TransformContext, fromTransform) type X = TransformContext Buffer Buffer -fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer) fromZlib z = fromTransform do raw <- liftEffect $ Zlib.toDuplex <$> z pure $ O.unsafeCoerceTransform raw -gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) gzip = fromZlib Zlib.createGzip -gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) gunzip = fromZlib Zlib.createGunzip -unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) unzip = fromZlib Zlib.createUnzip -inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) inflate = fromZlib Zlib.createInflate -deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) deflate = fromZlib Zlib.createDeflate -brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) brotliCompress = fromZlib Zlib.createBrotliCompress -brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m +brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) brotliDecompress = fromZlib Zlib.createBrotliDecompress