fix: asyncpipe is profunctor

This commit is contained in:
orion 2024-06-22 19:12:31 -05:00
parent 970d890a00
commit 08bd9a817a
Signed by: orion
GPG Key ID: 6D4165AE4C928719
5 changed files with 29 additions and 13 deletions

View File

@ -29,6 +29,7 @@ workspace:
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@ -36,6 +36,7 @@ package:
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@ -21,6 +21,7 @@ import Data.Either (Either(..), either)
import Data.Generic.Rep (class Generic) import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), isNothing) import Data.Maybe (Maybe(..), isNothing)
import Data.Newtype (unwrap) import Data.Newtype (unwrap)
import Data.Profunctor (class Profunctor)
import Data.Show.Generic (genericShow) import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds) import Data.Time.Duration (Milliseconds)
import Data.Traversable (traverse_) import Data.Traversable (traverse_)
@ -92,7 +93,7 @@ instance Show a => Show (ReadResult a) where show = genericShow
-- | - Attempt to read a chunk -- | - Attempt to read a chunk
-- | - `x -> m ReadSignal` -- | - `x -> m ReadSignal`
-- | - Block until the pipe is readable again (or reading must stop) -- | - Block until the pipe is readable again (or reading must stop)
data AsyncPipe x a b m = data AsyncPipe x m a b =
AsyncPipe AsyncPipe
(m x) (m x)
(x -> a -> m WriteResult) (x -> a -> m WriteResult)
@ -100,9 +101,22 @@ data AsyncPipe x a b m =
(x -> m (ReadResult b)) (x -> m (ReadResult b))
(x -> m ReadSignal) (x -> m ReadSignal)
instance Monad m => Functor (AsyncPipe x m a) where
map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar
instance Monad m => Profunctor (AsyncPipe x m) where
dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d
dimap ab cd (AsyncPipe init w aw r ar) =
AsyncPipe
init
(\x -> w x <<< ab)
aw
(map (map cd) <<< r)
ar
-- | Wraps all fields of an `AsyncPipe` with logging to debug -- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing. -- | behavior and timing.
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x a b m -> AsyncPipe x a b m debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m a b -> AsyncPipe x m a b
debug c (AsyncPipe init write awaitWrite read awaitRead) = debug c (AsyncPipe init write awaitWrite read awaitRead) =
let let
logL m = liftEffect $ log $ "[" <> c <> "] " <> m logL m = liftEffect $ log $ "[" <> c <> "] " <> m
@ -188,7 +202,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
-- | * `read` will pass chunks to `parse` as fast as `parse` allows -- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready -- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready -- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x (Maybe a) (Maybe b) m -> Pipe (Maybe a) (Maybe b) m Unit sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit
sync (AsyncPipe init write awaitWrite read awaitRead) = sync (AsyncPipe init write awaitWrite read awaitRead) =
let let
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r
@ -243,7 +257,7 @@ pipeAsync
=> MonadAff m => MonadAff m
=> MonadBracket Error f m => MonadBracket Error f m
=> Producer (Maybe a) m Unit => Producer (Maybe a) m Unit
-> AsyncPipe x (Maybe a) (Maybe b) m -> AsyncPipe x m (Maybe a) (Maybe b)
-> Producer (Maybe b) m Unit -> Producer (Maybe b) m Unit
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) = pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
do do

View File

@ -113,7 +113,7 @@ fromTransform
. MonadThrow Error m . MonadThrow Error m
=> MonadAff m => MonadAff m
=> Effect (O.Transform a b) => Effect (O.Transform a b)
-> AsyncPipe (TransformContext a b) (Maybe a) (Maybe b) m -> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b)
fromTransform t = fromTransform t =
let let
init = do init = do

View File

@ -17,29 +17,29 @@ import Pipes.Node.Stream (TransformContext, fromTransform)
type X = TransformContext Buffer Buffer type X = TransformContext Buffer Buffer
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X (Maybe Buffer) (Maybe Buffer) m fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
fromZlib z = fromZlib z =
fromTransform do fromTransform do
raw <- liftEffect $ Zlib.toDuplex <$> z raw <- liftEffect $ Zlib.toDuplex <$> z
pure $ O.unsafeCoerceTransform raw pure $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
gunzip = fromZlib Zlib.createGunzip gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
unzip = fromZlib Zlib.createUnzip unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
inflate = fromZlib Zlib.createInflate inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
deflate = fromZlib Zlib.createDeflate deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
brotliCompress = fromZlib Zlib.createBrotliCompress brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X (Maybe Buffer) (Maybe Buffer) m brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
brotliDecompress = fromZlib Zlib.createBrotliDecompress brotliDecompress = fromZlib Zlib.createBrotliDecompress