From 860ace3990595407114d5f485679cdc9914a27f4 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Sun, 23 Jun 2024 18:33:45 -0500 Subject: [PATCH] feat: AsyncPipe really should be a monad --- spago.lock | 1 - spago.yaml | 1 - src/Pipes.Async.purs | 334 ++++++++++++++++++------------- src/Pipes.Node.Stream.purs | 119 +++++------ src/Pipes.Node.Zlib.purs | 24 +-- test/Test/Pipes.Node.Stream.purs | 25 ++- 6 files changed, 276 insertions(+), 228 deletions(-) diff --git a/spago.lock b/spago.lock index 69a35c0..d985448 100644 --- a/spago.lock +++ b/spago.lock @@ -29,7 +29,6 @@ workspace: - parallel: ">=6.0.0 <7.0.0" - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" - - profunctor: ">=6.0.1 <7.0.0" - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" diff --git a/spago.yaml b/spago.yaml index ac6debb..eb520ec 100644 --- a/spago.yaml +++ b/spago.yaml @@ -36,7 +36,6 @@ package: - parallel: ">=6.0.0 <7.0.0" - pipes: ">=8.0.0 <9.0.0" - prelude: ">=6.0.1 <7.0.0" - - profunctor: ">=6.0.1 <7.0.0" - st: ">=6.2.0 <7.0.0" - strings: ">=6.0.1 <7.0.0" - tailrec: ">=6.1.0 <7.0.0" diff --git a/src/Pipes.Async.purs b/src/Pipes.Async.purs index f589254..4feffd1 100644 --- a/src/Pipes.Async.purs +++ b/src/Pipes.Async.purs @@ -3,10 +3,11 @@ module Pipes.Async where import Prelude hiding (join) import Control.Alternative (class Alternative, empty, guard) -import Control.Monad.Error.Class (class MonadError, catchError, throwError) +import Control.Monad.Cont (class MonadTrans) +import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError) import Control.Monad.Except (ExceptT, runExceptT) import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork) -import Control.Monad.Maybe.Trans (runMaybeT) +import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT) import Control.Monad.Morph (hoist) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.ST.Class (liftST) @@ -14,30 +15,30 @@ import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref as ST.Ref import Control.Monad.Trans.Class (lift) import Control.Parallel (class Parallel, parOneOf) -import Data.Array (fold) import Data.Array as Array import Data.DateTime.Instant as Instant import Data.Either (Either(..), either) +import Data.Foldable (class Foldable, fold) import Data.Generic.Rep (class Generic) -import Data.Maybe (Maybe(..), isNothing) +import Data.Maybe (Maybe(..), fromMaybe, isNothing) import Data.Newtype (unwrap) -import Data.Profunctor (class Profunctor) import Data.Show.Generic (genericShow) import Data.Time.Duration (Milliseconds) -import Data.Traversable (traverse_) +import Data.Traversable (class Traversable, traverse, traverse_) import Data.Tuple.Nested (type (/\), (/\)) -import Effect.Aff.Class (class MonadAff) -import Effect.Class (liftEffect) +import Effect.Aff.Class (class MonadAff, liftAff) +import Effect.Class (class MonadEffect, liftEffect) import Effect.Console (log) -import Effect.Exception (Error) +import Effect.Exception (Error, error) import Effect.Now as Now import Pipes (await, yield) import Pipes.Collect as Collect -import Pipes.Core (Pipe, Producer, Proxy) +import Pipes.Core (Pipe, Proxy, Producer) data WriteSignal = WriteSignalOk | WriteSignalEnded + derive instance Generic WriteSignal _ derive instance Eq WriteSignal derive instance Ord WriteSignal @@ -62,6 +63,7 @@ data WriteResult = WriteAgain | WriteNeedsDrain | WriteEnded + derive instance Generic WriteResult _ derive instance Eq WriteResult derive instance Ord WriteResult @@ -70,96 +72,148 @@ instance Show WriteResult where show = genericShow data ReadResult a = ReadOk a | ReadWouldBlock + derive instance Generic (ReadResult a) _ derive instance Eq a => Eq (ReadResult a) derive instance Ord a => Ord (ReadResult a) derive instance Functor ReadResult +derive instance Foldable ReadResult +derive instance Traversable ReadResult instance Show a => Show (ReadResult a) where show = genericShow +type AsyncIO a b m r = + { write :: a -> m WriteResult + , read :: m (ReadResult b) + , awaitWrite :: m WriteSignal + , awaitRead :: m ReadSignal + } + /\ AsyncPipe a b m r + -- | An `AsyncPipe` is a `Pipe`-like struct that allows -- | concurrently reading from a `Producer` and writing to a `Consumer`. -- | -- | An implementation of `AsyncPipe` for Node `Transform` streams -- | is provided in `Pipes.Node.Stream`. --- | --- | ## Fields --- | - `m x` --- | - Initializer --- | - `x -> a -> m WriteResult` --- | - Write a value `a` to the underlying resource --- | - `x -> m WriteSignal` --- | - Block until the pipe is writable again (or writing must stop) --- | - `x -> m (ReadResult b)` --- | - Attempt to read a chunk --- | - `x -> m ReadSignal` --- | - Block until the pipe is readable again (or reading must stop) -data AsyncPipe x m a b = - AsyncPipe - (m x) - (x -> a -> m WriteResult) - (x -> m WriteSignal) - (x -> m (ReadResult b)) - (x -> m ReadSignal) +data AsyncPipe a b m r + -- | A pure return value + = Pure r + -- | An `AsyncPipe` behind a computation + | M (m (AsyncPipe a b m r)) + -- | Interface to write & read from the backing resource + | AsyncIO (AsyncIO a b m r) -instance Monad m => Functor (AsyncPipe x m a) where - map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar +-- | Modify request / response types +mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r +mapIO _ _ (Pure a) = Pure a +mapIO a b (M m) = M $ mapIO a b <$> m +mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) = + AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m -instance Monad m => Profunctor (AsyncPipe x m) where - dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d - dimap ab cd (AsyncPipe init w aw r ar) = - AsyncPipe - init - (\x -> w x <<< ab) - aw - (map (map cd) <<< r) - ar +-- | Modify request / response types +bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r +bindIO _ _ (Pure a) = Pure a +bindIO a b (M m) = M $ bindIO a b <$> m +bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) = + AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m + +-- | Remove the `AsyncPipe` wrapper by discarding the IO +stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r +stripIO (Pure r) = pure r +stripIO (M m) = m >>= stripIO +stripIO (AsyncIO (_ /\ m)) = stripIO m + +-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any) +getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r)) +getAsyncIO (AsyncIO a) = pure $ Just a +getAsyncIO (M m) = m >>= getAsyncIO +getAsyncIO (Pure _) = pure Nothing + +instance MonadTrans (AsyncPipe a b) where + lift = M <<< map Pure + +instance Monad m => Functor (AsyncPipe a b m) where + map f (Pure r) = Pure $ f r + map f (M m) = M $ map f <$> m + map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m) + +instance Monad m => Apply (AsyncPipe a b m) where + apply (Pure f) ma = f <$> ma + apply (M mf) ma = M $ (_ <*> ma) <$> mf + apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma) + +instance Monad m => Applicative (AsyncPipe a b m) where + pure = Pure + +instance Monad m => Bind (AsyncPipe a b m) where + bind (Pure a) f = f a + bind (M ma) f = M $ (_ >>= f) <$> ma + bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f) + +instance Monad m => Monad (AsyncPipe a b m) + +instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where + throwError = lift <<< throwError + +instance MonadError e m => MonadError e (AsyncPipe a b m) where + catchError m f = lift $ catchError (stripIO m) (stripIO <<< f) + +instance MonadEffect m => MonadEffect (AsyncPipe a b m) where + liftEffect = lift <<< liftEffect + +instance MonadAff m => MonadAff (AsyncPipe a b m) where + liftAff = lift <<< liftAff -- | Wraps all fields of an `AsyncPipe` with logging to debug -- | behavior and timing. -debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b) -debug c (AsyncPipe init write awaitWrite read awaitRead) = +debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r +debug c m = let - logL m = liftEffect $ log $ "[" <> c <> "] " <> m - logR m = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> m + logL :: forall m'. MonadEffect m' => _ -> m' Unit + logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg + logR :: forall m'. MonadEffect m' => _ -> m' Unit + logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg - time :: forall a'. m a' -> m (Milliseconds /\ a') - time m = do + time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a') + time ma = do start <- liftEffect Now.now - a <- m + a <- ma end <- liftEffect Now.now pure $ (end `Instant.diff` start) /\ a - - init' = do - logL "init >" - elapsed /\ x <- time init - logL $ "< init " <> "(" <> show (unwrap elapsed) <> "ms)" - pure x - - write' x a = do - logL "write >" - elapsed /\ w <- time $ write x a - logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)" - pure w - - read' x = do - logR "read >" - elapsed /\ r <- time $ read x - logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)" - pure r - - awaitWrite' x = do - logL "awaitWrite >" - elapsed /\ w <- time $ awaitWrite x - logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)" - pure w - - awaitRead' x = do - logR "awaitRead >" - elapsed /\ r <- time $ awaitRead x - logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)" - pure r in - AsyncPipe init' write' awaitWrite' read' awaitRead' + flip bind (fromMaybe m) + $ runMaybeT do + (io /\ done') <- MaybeT $ lift $ getAsyncIO m + let + write a = do + logL "write >" + elapsed /\ w <- time $ io.write a + logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)" + pure w + + read = do + logR "read >" + elapsed /\ r <- time $ io.read + logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)" + pure r + + awaitWrite = do + logL "awaitWrite >" + elapsed /\ w <- time $ io.awaitWrite + logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)" + pure w + + awaitRead = do + logR "awaitRead >" + elapsed /\ r <- time $ io.awaitRead + logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)" + pure r + + done = do + logL "done >" + elapsed /\ r <- time done' + logL $ "< done (" <> show (unwrap elapsed) <> "ms)" + pure r + pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done -- | Convert an `AsyncPipe` to a regular `Pipe`. -- | @@ -202,43 +256,47 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) = -- | * `read` will pass chunks to `parse` as fast as `parse` allows -- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready -- | * `encode` will encode chunks and yield them to `write` as soon as they're ready -sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit -sync (AsyncPipe init write awaitWrite read awaitRead) = +sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r +sync m = let - liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r + liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r' liftPipe = lift - liftM :: forall r. m r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r + liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r' liftM = liftPipe <<< lift + in + lift (getAsyncIO m) >>= + case _ of + Nothing -> lift $ stripIO m + Just ({write, awaitWrite, read, awaitRead} /\ done) -> + let + awaitRW onR onW = + liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead]) + >>= either (const onR) onW - continue a = throwError (Loop a) - break = throwError (Done unit) + wSignal WriteSignalOk = WriteAgain + wSignal WriteSignalEnded = WriteEnded - awaitRW x = parOneOf [Right <$> awaitWrite x, Left <$> awaitRead x] + tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a + continue a = throwError (Loop a) + break = (Done <$> liftM (stripIO done)) >>= throwError + in do + flip tailRecEarly WriteAgain \writable -> do + rb <- liftM read + case rb of + ReadWouldBlock + | writable == WriteEnded -> liftM awaitRead *> continue writable + | writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal) + | otherwise -> pure unit + ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable + ReadOk Nothing -> liftPipe (yield Nothing) *> break - wSignal WriteSignalOk = WriteAgain - wSignal WriteSignalEnded = WriteEnded - in do - x <- lift init - flip tailRecM WriteAgain - \w -> - map (either identity identity) - $ runExceptT do - rb <- liftM $ read x - case rb of - ReadWouldBlock - | w == WriteEnded -> liftM (awaitRead x) *> continue w - | w == WriteNeedsDrain -> liftM (awaitRW x) >>= either (const $ continue w) (continue <<< wSignal) - | otherwise -> pure unit - ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue w - ReadOk Nothing -> liftPipe (yield Nothing) *> break + when (writable /= WriteAgain) $ continue writable - when (w /= WriteAgain) $ continue w - - a <- liftPipe await - w' <- liftM $ write x a - when (isNothing a) $ continue WriteEnded - pure $ Loop w' + a <- liftPipe await + writable' <- liftM $ write a + when (isNothing a) $ continue WriteEnded + pure $ Loop writable' -- | Implementation of `(>-/->)` -- | @@ -252,44 +310,48 @@ sync (AsyncPipe init write awaitWrite read awaitRead) = -- | -- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown. pipeAsync - :: forall f m x a b + :: forall f m a b . MonadRec m => MonadAff m => MonadBracket Error f m => Producer (Maybe a) m Unit - -> AsyncPipe x m (Maybe a) (Maybe b) + -> AsyncPipe (Maybe a) (Maybe b) m Unit -> Producer (Maybe b) m Unit -pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) = - do - errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing - killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false +pipeAsync prod m = + lift (getAsyncIO m) + >>= case _ of + Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`" + Just ({write, read, awaitWrite, awaitRead} /\ done) -> do + errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing + killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false - let - killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST - threadKilled = liftEffect $ liftST $ ST.Ref.read killST - putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just - getThreadError = liftEffect $ liftST $ ST.Ref.read errST + let + killThread = void $ liftEffect $ liftST $ ST.Ref.write true killST + threadKilled = liftEffect $ liftST $ ST.Ref.read killST + putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just + getThreadError = liftEffect $ liftST $ ST.Ref.read errST - rx x a = do - killed <- threadKilled - guard $ not killed - w <- lift $ write x a - case w of - WriteNeedsDrain -> lift $ void $ awaitWrite x - WriteEnded -> empty - WriteAgain -> pure unit + rx a = do + killed <- threadKilled + guard $ not killed + w <- lift $ write a + case w of + WriteNeedsDrain -> lift $ void awaitWrite + WriteEnded -> empty + WriteAgain -> pure unit - spawn = lift <<< fork <<< flip catchError putThreadError + spawn = lift <<< fork <<< flip catchError putThreadError - x <- lift init - _thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod) + _thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod) - flip tailRecM unit $ const do - getThreadError >>= traverse_ throwError - rb <- lift $ read x - case rb of - ReadOk (Just b) -> yield (Just b) $> Loop unit - ReadOk Nothing -> killThread *> yield Nothing $> Done unit - ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit + flip tailRecM unit $ const do + getThreadError >>= traverse_ throwError + rb <- lift read + case rb of + ReadOk (Just b) -> yield (Just b) $> Loop unit + ReadOk Nothing -> killThread *> yield Nothing $> Done unit + ReadWouldBlock -> void (lift awaitRead) $> Loop unit + + lift $ stripIO done infixl 7 pipeAsync as >-/-> diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 3bcb662..c02bf8d 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -5,8 +5,6 @@ import Prelude hiding (join) import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.ST.Class (liftST) -import Control.Monad.ST.Global (Global) -import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref as ST.Ref import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) @@ -15,9 +13,9 @@ import Data.Traversable (for_, traverse_) import Data.Tuple.Nested ((/\)) import Effect (Effect) import Effect.Aff.Class (class MonadAff, liftAff) -import Effect.Class (class MonadEffect, liftEffect) +import Effect.Class (liftEffect) import Effect.Exception (Error) -import Node.Stream.Object (WriteResult(..), maybeReadResult) +import Node.Stream.Object (ReadResult(..), WriteResult(..)) import Node.Stream.Object as O import Pipes (await, yield) import Pipes (for) as P @@ -90,77 +88,60 @@ fromWritable w = do tailRecM go unit -newtype TransformContext a b = - TransformContext - { stream :: O.Transform a b - , removeErrorListener :: Effect Unit - , errorST :: STRef Global (Maybe Error) - } - -transformCleanup :: forall m a b. MonadEffect m => TransformContext a b -> m Unit -transformCleanup (TransformContext {removeErrorListener}) = do - liftEffect removeErrorListener - -transformStream :: forall a b. TransformContext a b -> O.Transform a b -transformStream (TransformContext {stream}) = stream - -transformRethrow :: forall m a b. MonadThrow Error m => MonadEffect m => TransformContext a b -> m Unit -transformRethrow (TransformContext {errorST}) = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST) +fromTransformEffect + :: forall a b m + . MonadThrow Error m + => MonadAff m + => Effect (O.Transform a b) + -> AsyncPipe (Maybe a) (Maybe b) m Unit +fromTransformEffect = fromTransform <=< liftEffect -- | Convert a `Transform` stream to an `AsyncPipe`. fromTransform :: forall a b m . MonadThrow Error m => MonadAff m - => Effect (O.Transform a b) - -> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b) -fromTransform t = - let - init = do - stream <- liftEffect t - { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream - pure $ TransformContext {errorST, removeErrorListener, stream} - write x Nothing = do - let s = transformStream x - liftEffect $ O.end s - pure AsyncPipe.WriteEnded - write x (Just a) = do - transformRethrow x - let s = transformStream x - w <- liftEffect $ O.write s a - pure $ case w of - WriteOk -> AsyncPipe.WriteAgain - WriteWouldBlock -> AsyncPipe.WriteNeedsDrain - awaitWrite x = do - transformRethrow x - let s = transformStream x - liftAff $ O.awaitWritableOrClosed s - ended <- liftEffect $ O.isWritableEnded s - if ended then - pure $ AsyncPipe.WriteSignalEnded - else do - liftAff $ O.awaitWritableOrClosed s - pure $ AsyncPipe.WriteSignalOk - read x = - do - transformRethrow x - let s = transformStream x - readEnded <- liftEffect $ O.isReadableEnded s - if readEnded then do - pure $ AsyncPipe.ReadOk Nothing - else - maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s) - awaitRead x = do - transformRethrow x - let s = transformStream x - ended <- liftEffect $ O.isReadableEnded s - if ended then - pure $ AsyncPipe.ReadSignalEnded - else do - liftAff $ O.awaitReadableOrClosed s - pure $ AsyncPipe.ReadSignalOk - in - AsyncPipe init write awaitWrite read awaitRead + => O.Transform a b + -> AsyncPipe (Maybe a) (Maybe b) m Unit +fromTransform stream = do + { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream + let + rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST) + cleanup = liftEffect removeErrorListener + + writeSignal = + liftEffect (O.isWritableEnded stream) + <#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk + + readSignal = + liftEffect (O.isReadableEnded stream) + <#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk + + writeResult WriteOk = AsyncPipe.WriteAgain + writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain + + readResult (ReadJust a) = AsyncPipe.ReadOk (Just a) + readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock + + awaitWritable = liftAff $ O.awaitWritableOrClosed stream + awaitReadable = liftAff $ O.awaitReadableOrClosed stream + + awaitWrite = rethrow *> awaitWritable *> writeSignal + awaitRead = rethrow *> awaitReadable *> readSignal + + whenReadNotEnded m = + liftEffect (O.isReadableEnded stream) + >>= if _ then pure $ AsyncPipe.ReadOk Nothing else m + + readNow = readResult <$> liftEffect (O.read stream) + writeNow a = writeResult <$> liftEffect (O.write stream a) + + read = rethrow *> whenReadNotEnded readNow + + write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded + write (Just a) = rethrow *> writeNow a + + AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup) -- | Given a `Producer` of values, wrap them in `Just`. -- | diff --git a/src/Pipes.Node.Zlib.purs b/src/Pipes.Node.Zlib.purs index 52eed7f..372fdac 100644 --- a/src/Pipes.Node.Zlib.purs +++ b/src/Pipes.Node.Zlib.purs @@ -13,33 +13,31 @@ import Node.Stream.Object as O import Node.Zlib as Zlib import Node.Zlib.Types (ZlibStream) import Pipes.Async (AsyncPipe) -import Pipes.Node.Stream (TransformContext, fromTransform) +import Pipes.Node.Stream (fromTransform) -type X = TransformContext Buffer Buffer - -fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit fromZlib z = - fromTransform do + do raw <- liftEffect $ Zlib.toDuplex <$> z - pure $ O.unsafeCoerceTransform raw + fromTransform $ O.unsafeCoerceTransform raw -gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit gzip = fromZlib Zlib.createGzip -gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit gunzip = fromZlib Zlib.createGunzip -unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit unzip = fromZlib Zlib.createUnzip -inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit inflate = fromZlib Zlib.createInflate -deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit deflate = fromZlib Zlib.createDeflate -brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit brotliCompress = fromZlib Zlib.createBrotliCompress -brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) +brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit brotliDecompress = fromZlib Zlib.createBrotliDecompress diff --git a/test/Test/Pipes.Node.Stream.purs b/test/Test/Pipes.Node.Stream.purs index 133001b..b1a0efd 100644 --- a/test/Test/Pipes.Node.Stream.purs +++ b/test/Test/Pipes.Node.Stream.purs @@ -150,8 +150,8 @@ spec = cbor :: Buffer <- Pipe.Collect.toBuffer $ Pipe.FS.read a >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) - >-/-> Pipe.Node.fromTransform csvDecode - >-/-> Pipe.Node.fromTransform cborEncode + >-/-> Pipe.Node.fromTransformEffect csvDecode + >-/-> Pipe.Node.fromTransformEffect cborEncode >-> Pipe.Node.unEOS f :: Array Foreign <- liftEffect $ cborDecodeSync cbor ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f @@ -163,8 +163,8 @@ spec = cbor :: Buffer <- Pipe.Collect.toBuffer $ Pipe.FS.read a >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) - >-> sync (Pipe.Node.fromTransform csvDecode) - >-> sync (Pipe.Node.fromTransform cborEncode) + >-> sync (Pipe.Node.fromTransformEffect csvDecode) + >-> sync (Pipe.Node.fromTransformEffect cborEncode) >-> Pipe.Node.unEOS f :: Array Foreign <- liftEffect $ cborDecodeSync cbor ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f @@ -188,7 +188,7 @@ spec = Pipe.Collect.toMonoid $ Pipe.FS.read a >-/-> Pipe.Zlib.gzip - >-/-> Pipe.Node.fromTransform slowTransform + >-/-> Pipe.Node.fromTransformEffect slowTransform >-/-> Pipe.Zlib.gunzip >-> Pipe.Node.unEOS >-> Pipe.Buffer.toString UTF8 @@ -201,7 +201,7 @@ spec = Pipe.Collect.toMonoid $ Pipe.FS.read a >-> sync Pipe.Zlib.gzip - >-> sync (Pipe.Node.fromTransform slowTransform) + >-> sync (Pipe.Node.fromTransformEffect slowTransform) >-> sync Pipe.Zlib.gunzip >-> Pipe.Node.unEOS >-> Pipe.Buffer.toString UTF8 @@ -210,10 +210,19 @@ spec = around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do liftEffect $ FS.writeTextFile UTF8 p "foo" r <- reader p - out :: List.List Int <- Pipe.toListM $ r >-/-> Pipe.Node.fromTransform discardTransform >-> Pipe.Node.unEOS + out :: List.List Int <- + Pipe.toListM + $ r + >-/-> Pipe.Node.fromTransformEffect discardTransform + >-> Pipe.Node.unEOS out `shouldEqual` List.Nil around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do liftEffect $ FS.writeTextFile UTF8 p "foo bar" r <- reader p - out :: List.List String <- Pipe.toListM $ r >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-/-> Pipe.Node.fromTransform charsTransform >-> Pipe.Node.unEOS + out :: List.List String <- + Pipe.toListM $ + r + >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) + >-/-> Pipe.Node.fromTransformEffect charsTransform + >-> Pipe.Node.unEOS out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]