feat: AsyncPipe really should be a monad

This commit is contained in:
orion 2024-06-23 18:33:45 -05:00
parent 7f11c303fb
commit 860ace3990
Signed by: orion
GPG Key ID: 6D4165AE4C928719
6 changed files with 276 additions and 228 deletions

View File

@ -29,7 +29,6 @@ workspace:
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@ -36,7 +36,6 @@ package:
- parallel: ">=6.0.0 <7.0.0" - parallel: ">=6.0.0 <7.0.0"
- pipes: ">=8.0.0 <9.0.0" - pipes: ">=8.0.0 <9.0.0"
- prelude: ">=6.0.1 <7.0.0" - prelude: ">=6.0.1 <7.0.0"
- profunctor: ">=6.0.1 <7.0.0"
- st: ">=6.2.0 <7.0.0" - st: ">=6.2.0 <7.0.0"
- strings: ">=6.0.1 <7.0.0" - strings: ">=6.0.1 <7.0.0"
- tailrec: ">=6.1.0 <7.0.0" - tailrec: ">=6.1.0 <7.0.0"

View File

@ -3,10 +3,11 @@ module Pipes.Async where
import Prelude hiding (join) import Prelude hiding (join)
import Control.Alternative (class Alternative, empty, guard) import Control.Alternative (class Alternative, empty, guard)
import Control.Monad.Error.Class (class MonadError, catchError, throwError) import Control.Monad.Cont (class MonadTrans)
import Control.Monad.Error.Class (class MonadError, class MonadThrow, catchError, throwError)
import Control.Monad.Except (ExceptT, runExceptT) import Control.Monad.Except (ExceptT, runExceptT)
import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork) import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork)
import Control.Monad.Maybe.Trans (runMaybeT) import Control.Monad.Maybe.Trans (MaybeT(..), runMaybeT)
import Control.Monad.Morph (hoist) import Control.Monad.Morph (hoist)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
@ -14,30 +15,30 @@ import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel, parOneOf) import Control.Parallel (class Parallel, parOneOf)
import Data.Array (fold)
import Data.Array as Array import Data.Array as Array
import Data.DateTime.Instant as Instant import Data.DateTime.Instant as Instant
import Data.Either (Either(..), either) import Data.Either (Either(..), either)
import Data.Foldable (class Foldable, fold)
import Data.Generic.Rep (class Generic) import Data.Generic.Rep (class Generic)
import Data.Maybe (Maybe(..), isNothing) import Data.Maybe (Maybe(..), fromMaybe, isNothing)
import Data.Newtype (unwrap) import Data.Newtype (unwrap)
import Data.Profunctor (class Profunctor)
import Data.Show.Generic (genericShow) import Data.Show.Generic (genericShow)
import Data.Time.Duration (Milliseconds) import Data.Time.Duration (Milliseconds)
import Data.Traversable (traverse_) import Data.Traversable (class Traversable, traverse, traverse_)
import Data.Tuple.Nested (type (/\), (/\)) import Data.Tuple.Nested (type (/\), (/\))
import Effect.Aff.Class (class MonadAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (liftEffect) import Effect.Class (class MonadEffect, liftEffect)
import Effect.Console (log) import Effect.Console (log)
import Effect.Exception (Error) import Effect.Exception (Error, error)
import Effect.Now as Now import Effect.Now as Now
import Pipes (await, yield) import Pipes (await, yield)
import Pipes.Collect as Collect import Pipes.Collect as Collect
import Pipes.Core (Pipe, Producer, Proxy) import Pipes.Core (Pipe, Proxy, Producer)
data WriteSignal data WriteSignal
= WriteSignalOk = WriteSignalOk
| WriteSignalEnded | WriteSignalEnded
derive instance Generic WriteSignal _ derive instance Generic WriteSignal _
derive instance Eq WriteSignal derive instance Eq WriteSignal
derive instance Ord WriteSignal derive instance Ord WriteSignal
@ -62,6 +63,7 @@ data WriteResult
= WriteAgain = WriteAgain
| WriteNeedsDrain | WriteNeedsDrain
| WriteEnded | WriteEnded
derive instance Generic WriteResult _ derive instance Generic WriteResult _
derive instance Eq WriteResult derive instance Eq WriteResult
derive instance Ord WriteResult derive instance Ord WriteResult
@ -70,96 +72,148 @@ instance Show WriteResult where show = genericShow
data ReadResult a data ReadResult a
= ReadOk a = ReadOk a
| ReadWouldBlock | ReadWouldBlock
derive instance Generic (ReadResult a) _ derive instance Generic (ReadResult a) _
derive instance Eq a => Eq (ReadResult a) derive instance Eq a => Eq (ReadResult a)
derive instance Ord a => Ord (ReadResult a) derive instance Ord a => Ord (ReadResult a)
derive instance Functor ReadResult derive instance Functor ReadResult
derive instance Foldable ReadResult
derive instance Traversable ReadResult
instance Show a => Show (ReadResult a) where show = genericShow instance Show a => Show (ReadResult a) where show = genericShow
type AsyncIO a b m r =
{ write :: a -> m WriteResult
, read :: m (ReadResult b)
, awaitWrite :: m WriteSignal
, awaitRead :: m ReadSignal
}
/\ AsyncPipe a b m r
-- | An `AsyncPipe` is a `Pipe`-like struct that allows -- | An `AsyncPipe` is a `Pipe`-like struct that allows
-- | concurrently reading from a `Producer` and writing to a `Consumer`. -- | concurrently reading from a `Producer` and writing to a `Consumer`.
-- | -- |
-- | An implementation of `AsyncPipe` for Node `Transform` streams -- | An implementation of `AsyncPipe` for Node `Transform` streams
-- | is provided in `Pipes.Node.Stream`. -- | is provided in `Pipes.Node.Stream`.
-- | data AsyncPipe a b m r
-- | ## Fields -- | A pure return value
-- | - `m x` = Pure r
-- | - Initializer -- | An `AsyncPipe` behind a computation
-- | - `x -> a -> m WriteResult` | M (m (AsyncPipe a b m r))
-- | - Write a value `a` to the underlying resource -- | Interface to write & read from the backing resource
-- | - `x -> m WriteSignal` | AsyncIO (AsyncIO a b m r)
-- | - Block until the pipe is writable again (or writing must stop)
-- | - `x -> m (ReadResult b)`
-- | - Attempt to read a chunk
-- | - `x -> m ReadSignal`
-- | - Block until the pipe is readable again (or reading must stop)
data AsyncPipe x m a b =
AsyncPipe
(m x)
(x -> a -> m WriteResult)
(x -> m WriteSignal)
(x -> m (ReadResult b))
(x -> m ReadSignal)
instance Monad m => Functor (AsyncPipe x m a) where -- | Modify request / response types
map f (AsyncPipe init w aw r ar) = AsyncPipe init w aw (map (map f) <<< r) ar mapIO :: forall aa ab ba bb m r. Monad m => (ab -> aa) -> (ba -> bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
mapIO _ _ (Pure a) = Pure a
mapIO a b (M m) = M $ mapIO a b <$> m
mapIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
AsyncIO $ {write: write <<< a, awaitWrite, read: map b <$> read, awaitRead} /\ mapIO a b m
instance Monad m => Profunctor (AsyncPipe x m) where -- | Modify request / response types
dimap :: forall a b c d. (a -> b) -> (c -> d) -> _ b c -> _ a d bindIO :: forall aa ab ba bb m r. Monad m => (ab -> m aa) -> (ba -> m bb) -> AsyncPipe aa ba m r -> AsyncPipe ab bb m r
dimap ab cd (AsyncPipe init w aw r ar) = bindIO _ _ (Pure a) = Pure a
AsyncPipe bindIO a b (M m) = M $ bindIO a b <$> m
init bindIO a b (AsyncIO ({write, awaitWrite, read, awaitRead} /\ m)) =
(\x -> w x <<< ab) AsyncIO $ {write: flip bind write <<< a, awaitWrite, read: traverse b =<< read, awaitRead} /\ bindIO a b m
aw
(map (map cd) <<< r) -- | Remove the `AsyncPipe` wrapper by discarding the IO
ar stripIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m r
stripIO (Pure r) = pure r
stripIO (M m) = m >>= stripIO
stripIO (AsyncIO (_ /\ m)) = stripIO m
-- | Execute the `AsyncPipe` monad stack until `AsyncIO` is reached (if any)
getAsyncIO :: forall a b m r. Monad m => AsyncPipe a b m r -> m (Maybe (AsyncIO a b m r))
getAsyncIO (AsyncIO a) = pure $ Just a
getAsyncIO (M m) = m >>= getAsyncIO
getAsyncIO (Pure _) = pure Nothing
instance MonadTrans (AsyncPipe a b) where
lift = M <<< map Pure
instance Monad m => Functor (AsyncPipe a b m) where
map f (Pure r) = Pure $ f r
map f (M m) = M $ map f <$> m
map f (AsyncIO (io /\ m)) = AsyncIO $ io /\ (f <$> m)
instance Monad m => Apply (AsyncPipe a b m) where
apply (Pure f) ma = f <$> ma
apply (M mf) ma = M $ (_ <*> ma) <$> mf
apply (AsyncIO (io /\ mf)) ma = AsyncIO $ io /\ (mf <*> ma)
instance Monad m => Applicative (AsyncPipe a b m) where
pure = Pure
instance Monad m => Bind (AsyncPipe a b m) where
bind (Pure a) f = f a
bind (M ma) f = M $ (_ >>= f) <$> ma
bind (AsyncIO (io /\ m)) f = AsyncIO $ io /\ (m >>= f)
instance Monad m => Monad (AsyncPipe a b m)
instance MonadThrow e m => MonadThrow e (AsyncPipe a b m) where
throwError = lift <<< throwError
instance MonadError e m => MonadError e (AsyncPipe a b m) where
catchError m f = lift $ catchError (stripIO m) (stripIO <<< f)
instance MonadEffect m => MonadEffect (AsyncPipe a b m) where
liftEffect = lift <<< liftEffect
instance MonadAff m => MonadAff (AsyncPipe a b m) where
liftAff = lift <<< liftAff
-- | Wraps all fields of an `AsyncPipe` with logging to debug -- | Wraps all fields of an `AsyncPipe` with logging to debug
-- | behavior and timing. -- | behavior and timing.
debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b) debug :: forall a b m r. MonadAff m => String -> AsyncPipe (Maybe a) (Maybe b) m r -> AsyncPipe (Maybe a) (Maybe b) m r
debug c (AsyncPipe init write awaitWrite read awaitRead) = debug c m =
let let
logL m = liftEffect $ log $ "[" <> c <> "] " <> m logL :: forall m'. MonadEffect m' => _ -> m' Unit
logR m = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> m logL msg = liftEffect $ log $ "[" <> c <> "] " <> msg
logR :: forall m'. MonadEffect m' => _ -> m' Unit
logR msg = liftEffect $ log $ "[" <> c <> "] " <> fold (Array.replicate 20 " ") <> msg
time :: forall a'. m a' -> m (Milliseconds /\ a') time :: forall m' a'. MonadEffect m' => m' a' -> m' (Milliseconds /\ a')
time m = do time ma = do
start <- liftEffect Now.now start <- liftEffect Now.now
a <- m a <- ma
end <- liftEffect Now.now end <- liftEffect Now.now
pure $ (end `Instant.diff` start) /\ a pure $ (end `Instant.diff` start) /\ a
in
init' = do flip bind (fromMaybe m)
logL "init >" $ runMaybeT do
elapsed /\ x <- time init (io /\ done') <- MaybeT $ lift $ getAsyncIO m
logL $ "< init " <> "(" <> show (unwrap elapsed) <> "ms)" let
pure x write a = do
write' x a = do
logL "write >" logL "write >"
elapsed /\ w <- time $ write x a elapsed /\ w <- time $ io.write a
logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)" logL $ "< write " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w pure w
read' x = do read = do
logR "read >" logR "read >"
elapsed /\ r <- time $ read x elapsed /\ r <- time $ io.read
logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)" logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)"
pure r pure r
awaitWrite' x = do awaitWrite = do
logL "awaitWrite >" logL "awaitWrite >"
elapsed /\ w <- time $ awaitWrite x elapsed /\ w <- time $ io.awaitWrite
logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)" logL $ "< awaitWrite " <> show w <> " (" <> show (unwrap elapsed) <> "ms)"
pure w pure w
awaitRead' x = do awaitRead = do
logR "awaitRead >" logR "awaitRead >"
elapsed /\ r <- time $ awaitRead x elapsed /\ r <- time $ io.awaitRead
logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)" logR $ "< awaitRead " <> show r <> " (" <> show (unwrap elapsed) <> "ms)"
pure r pure r
in
AsyncPipe init' write' awaitWrite' read' awaitRead' done = do
logL "done >"
elapsed /\ r <- time done'
logL $ "< done (" <> show (unwrap elapsed) <> "ms)"
pure r
pure $ AsyncIO $ {write, read, awaitWrite, awaitRead} /\ done
-- | Convert an `AsyncPipe` to a regular `Pipe`. -- | Convert an `AsyncPipe` to a regular `Pipe`.
-- | -- |
@ -202,43 +256,47 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) =
-- | * `read` will pass chunks to `parse` as fast as `parse` allows -- | * `read` will pass chunks to `parse` as fast as `parse` allows
-- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready -- | * `parse` will parse chunks and yield them to `encode` as soon as they're ready
-- | * `encode` will encode chunks and yield them to `write` as soon as they're ready -- | * `encode` will encode chunks and yield them to `write` as soon as they're ready
sync :: forall x a b f p e m. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe x m (Maybe a) (Maybe b) -> Pipe (Maybe a) (Maybe b) m Unit sync :: forall a b f p e m r. MonadError e m => Alternative p => Parallel p m => MonadFork f m => MonadAff m => AsyncPipe (Maybe a) (Maybe b) m r -> Pipe (Maybe a) (Maybe b) m r
sync (AsyncPipe init write awaitWrite read awaitRead) = sync m =
let let
liftPipe :: forall r. (Proxy _ _ _ _ m) r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r liftPipe :: forall r'. (Proxy _ _ _ _ m) r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftPipe = lift liftPipe = lift
liftM :: forall r. m r -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r liftM :: forall r'. m r' -> ExceptT (Step _ _) (Proxy _ _ _ _ m) r'
liftM = liftPipe <<< lift liftM = liftPipe <<< lift
in
continue a = throwError (Loop a) lift (getAsyncIO m) >>=
break = throwError (Done unit) case _ of
Nothing -> lift $ stripIO m
awaitRW x = parOneOf [Right <$> awaitWrite x, Left <$> awaitRead x] Just ({write, awaitWrite, read, awaitRead} /\ done) ->
let
awaitRW onR onW =
liftM (parOneOf [Right <$> awaitWrite, Left <$> awaitRead])
>>= either (const onR) onW
wSignal WriteSignalOk = WriteAgain wSignal WriteSignalOk = WriteAgain
wSignal WriteSignalEnded = WriteEnded wSignal WriteSignalEnded = WriteEnded
tailRecEarly f a = tailRecM (map (either identity identity) <<< runExceptT <<< f) a
continue a = throwError (Loop a)
break = (Done <$> liftM (stripIO done)) >>= throwError
in do in do
x <- lift init flip tailRecEarly WriteAgain \writable -> do
flip tailRecM WriteAgain rb <- liftM read
\w ->
map (either identity identity)
$ runExceptT do
rb <- liftM $ read x
case rb of case rb of
ReadWouldBlock ReadWouldBlock
| w == WriteEnded -> liftM (awaitRead x) *> continue w | writable == WriteEnded -> liftM awaitRead *> continue writable
| w == WriteNeedsDrain -> liftM (awaitRW x) >>= either (const $ continue w) (continue <<< wSignal) | writable == WriteNeedsDrain -> awaitRW (continue writable) (continue <<< wSignal)
| otherwise -> pure unit | otherwise -> pure unit
ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue w ReadOk (Just b) -> liftPipe (yield $ Just b) *> continue writable
ReadOk Nothing -> liftPipe (yield Nothing) *> break ReadOk Nothing -> liftPipe (yield Nothing) *> break
when (w /= WriteAgain) $ continue w when (writable /= WriteAgain) $ continue writable
a <- liftPipe await a <- liftPipe await
w' <- liftM $ write x a writable' <- liftM $ write a
when (isNothing a) $ continue WriteEnded when (isNothing a) $ continue WriteEnded
pure $ Loop w' pure $ Loop writable'
-- | Implementation of `(>-/->)` -- | Implementation of `(>-/->)`
-- | -- |
@ -252,15 +310,18 @@ sync (AsyncPipe init write awaitWrite read awaitRead) =
-- | -- |
-- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown. -- | If the consuming half fails, the error is caught, the producing half is killed, and the error is rethrown.
pipeAsync pipeAsync
:: forall f m x a b :: forall f m a b
. MonadRec m . MonadRec m
=> MonadAff m => MonadAff m
=> MonadBracket Error f m => MonadBracket Error f m
=> Producer (Maybe a) m Unit => Producer (Maybe a) m Unit
-> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe (Maybe a) (Maybe b) m Unit
-> Producer (Maybe b) m Unit -> Producer (Maybe b) m Unit
pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) = pipeAsync prod m =
do lift (getAsyncIO m)
>>= case _ of
Nothing -> throwError $ error "`pipeAsync` invoked on `AsyncPipe` that did not have `AsyncIO`"
Just ({write, read, awaitWrite, awaitRead} /\ done) -> do
errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing errST :: STRef _ (Maybe Error) <- lift $ liftEffect $ liftST $ ST.Ref.new Nothing
killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false killST :: STRef _ Boolean <- lift $ liftEffect $ liftST $ ST.Ref.new false
@ -270,26 +331,27 @@ pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) =
putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just putThreadError = void <<< liftEffect <<< liftST <<< flip ST.Ref.write errST <<< Just
getThreadError = liftEffect $ liftST $ ST.Ref.read errST getThreadError = liftEffect $ liftST $ ST.Ref.read errST
rx x a = do rx a = do
killed <- threadKilled killed <- threadKilled
guard $ not killed guard $ not killed
w <- lift $ write x a w <- lift $ write a
case w of case w of
WriteNeedsDrain -> lift $ void $ awaitWrite x WriteNeedsDrain -> lift $ void awaitWrite
WriteEnded -> empty WriteEnded -> empty
WriteAgain -> pure unit WriteAgain -> pure unit
spawn = lift <<< fork <<< flip catchError putThreadError spawn = lift <<< fork <<< flip catchError putThreadError
x <- lift init _thread <- spawn $ void $ runMaybeT $ Collect.foreach rx (hoist lift prod)
_thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod)
flip tailRecM unit $ const do flip tailRecM unit $ const do
getThreadError >>= traverse_ throwError getThreadError >>= traverse_ throwError
rb <- lift $ read x rb <- lift read
case rb of case rb of
ReadOk (Just b) -> yield (Just b) $> Loop unit ReadOk (Just b) -> yield (Just b) $> Loop unit
ReadOk Nothing -> killThread *> yield Nothing $> Done unit ReadOk Nothing -> killThread *> yield Nothing $> Done unit
ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit ReadWouldBlock -> void (lift awaitRead) $> Loop unit
lift $ stripIO done
infixl 7 pipeAsync as >-/-> infixl 7 pipeAsync as >-/->

View File

@ -5,8 +5,6 @@ import Prelude hiding (join)
import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Error.Class (class MonadThrow, throwError)
import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM)
import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Class (liftST)
import Control.Monad.ST.Global (Global)
import Control.Monad.ST.Ref (STRef)
import Control.Monad.ST.Ref as ST.Ref import Control.Monad.ST.Ref as ST.Ref
import Control.Monad.ST.Ref as STRef import Control.Monad.ST.Ref as STRef
import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Class (lift)
@ -15,9 +13,9 @@ import Data.Traversable (for_, traverse_)
import Data.Tuple.Nested ((/\)) import Data.Tuple.Nested ((/\))
import Effect (Effect) import Effect (Effect)
import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Aff.Class (class MonadAff, liftAff)
import Effect.Class (class MonadEffect, liftEffect) import Effect.Class (liftEffect)
import Effect.Exception (Error) import Effect.Exception (Error)
import Node.Stream.Object (WriteResult(..), maybeReadResult) import Node.Stream.Object (ReadResult(..), WriteResult(..))
import Node.Stream.Object as O import Node.Stream.Object as O
import Pipes (await, yield) import Pipes (await, yield)
import Pipes (for) as P import Pipes (for) as P
@ -90,77 +88,60 @@ fromWritable w = do
tailRecM go unit tailRecM go unit
newtype TransformContext a b = fromTransformEffect
TransformContext :: forall a b m
{ stream :: O.Transform a b . MonadThrow Error m
, removeErrorListener :: Effect Unit => MonadAff m
, errorST :: STRef Global (Maybe Error) => Effect (O.Transform a b)
} -> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransformEffect = fromTransform <=< liftEffect
transformCleanup :: forall m a b. MonadEffect m => TransformContext a b -> m Unit
transformCleanup (TransformContext {removeErrorListener}) = do
liftEffect removeErrorListener
transformStream :: forall a b. TransformContext a b -> O.Transform a b
transformStream (TransformContext {stream}) = stream
transformRethrow :: forall m a b. MonadThrow Error m => MonadEffect m => TransformContext a b -> m Unit
transformRethrow (TransformContext {errorST}) = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
-- | Convert a `Transform` stream to an `AsyncPipe`. -- | Convert a `Transform` stream to an `AsyncPipe`.
fromTransform fromTransform
:: forall a b m :: forall a b m
. MonadThrow Error m . MonadThrow Error m
=> MonadAff m => MonadAff m
=> Effect (O.Transform a b) => O.Transform a b
-> AsyncPipe (TransformContext a b) m (Maybe a) (Maybe b) -> AsyncPipe (Maybe a) (Maybe b) m Unit
fromTransform t = fromTransform stream = do
let
init = do
stream <- liftEffect t
{ error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream { error: errorST, cancel: removeErrorListener } <- liftEffect $ O.withErrorST stream
pure $ TransformContext {errorST, removeErrorListener, stream} let
write x Nothing = do rethrow = traverse_ throwError =<< liftEffect (liftST $ ST.Ref.read errorST)
let s = transformStream x cleanup = liftEffect removeErrorListener
liftEffect $ O.end s
pure AsyncPipe.WriteEnded writeSignal =
write x (Just a) = do liftEffect (O.isWritableEnded stream)
transformRethrow x <#> if _ then AsyncPipe.WriteSignalEnded else AsyncPipe.WriteSignalOk
let s = transformStream x
w <- liftEffect $ O.write s a readSignal =
pure $ case w of liftEffect (O.isReadableEnded stream)
WriteOk -> AsyncPipe.WriteAgain <#> if _ then AsyncPipe.ReadSignalEnded else AsyncPipe.ReadSignalOk
WriteWouldBlock -> AsyncPipe.WriteNeedsDrain
awaitWrite x = do writeResult WriteOk = AsyncPipe.WriteAgain
transformRethrow x writeResult WriteWouldBlock = AsyncPipe.WriteNeedsDrain
let s = transformStream x
liftAff $ O.awaitWritableOrClosed s readResult (ReadJust a) = AsyncPipe.ReadOk (Just a)
ended <- liftEffect $ O.isWritableEnded s readResult ReadWouldBlock = AsyncPipe.ReadWouldBlock
if ended then
pure $ AsyncPipe.WriteSignalEnded awaitWritable = liftAff $ O.awaitWritableOrClosed stream
else do awaitReadable = liftAff $ O.awaitReadableOrClosed stream
liftAff $ O.awaitWritableOrClosed s
pure $ AsyncPipe.WriteSignalOk awaitWrite = rethrow *> awaitWritable *> writeSignal
read x = awaitRead = rethrow *> awaitReadable *> readSignal
do
transformRethrow x whenReadNotEnded m =
let s = transformStream x liftEffect (O.isReadableEnded stream)
readEnded <- liftEffect $ O.isReadableEnded s >>= if _ then pure $ AsyncPipe.ReadOk Nothing else m
if readEnded then do
pure $ AsyncPipe.ReadOk Nothing readNow = readResult <$> liftEffect (O.read stream)
else writeNow a = writeResult <$> liftEffect (O.write stream a)
maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)
awaitRead x = do read = rethrow *> whenReadNotEnded readNow
transformRethrow x
let s = transformStream x write Nothing = liftEffect (O.end stream) $> AsyncPipe.WriteEnded
ended <- liftEffect $ O.isReadableEnded s write (Just a) = rethrow *> writeNow a
if ended then
pure $ AsyncPipe.ReadSignalEnded AsyncIO ({write, awaitWrite, read, awaitRead} /\ cleanup)
else do
liftAff $ O.awaitReadableOrClosed s
pure $ AsyncPipe.ReadSignalOk
in
AsyncPipe init write awaitWrite read awaitRead
-- | Given a `Producer` of values, wrap them in `Just`. -- | Given a `Producer` of values, wrap them in `Just`.
-- | -- |

View File

@ -13,33 +13,31 @@ import Node.Stream.Object as O
import Node.Zlib as Zlib import Node.Zlib as Zlib
import Node.Zlib.Types (ZlibStream) import Node.Zlib.Types (ZlibStream)
import Pipes.Async (AsyncPipe) import Pipes.Async (AsyncPipe)
import Pipes.Node.Stream (TransformContext, fromTransform) import Pipes.Node.Stream (fromTransform)
type X = TransformContext Buffer Buffer fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
fromZlib :: forall r m. MonadAff m => MonadThrow Error m => Effect (ZlibStream r) -> AsyncPipe X m (Maybe Buffer) (Maybe Buffer)
fromZlib z = fromZlib z =
fromTransform do do
raw <- liftEffect $ Zlib.toDuplex <$> z raw <- liftEffect $ Zlib.toDuplex <$> z
pure $ O.unsafeCoerceTransform raw fromTransform $ O.unsafeCoerceTransform raw
gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) gzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gzip = fromZlib Zlib.createGzip gzip = fromZlib Zlib.createGzip
gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) gunzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
gunzip = fromZlib Zlib.createGunzip gunzip = fromZlib Zlib.createGunzip
unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) unzip :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
unzip = fromZlib Zlib.createUnzip unzip = fromZlib Zlib.createUnzip
inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) inflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
inflate = fromZlib Zlib.createInflate inflate = fromZlib Zlib.createInflate
deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) deflate :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
deflate = fromZlib Zlib.createDeflate deflate = fromZlib Zlib.createDeflate
brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) brotliCompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliCompress = fromZlib Zlib.createBrotliCompress brotliCompress = fromZlib Zlib.createBrotliCompress
brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe X m (Maybe Buffer) (Maybe Buffer) brotliDecompress :: forall m. MonadAff m => MonadThrow Error m => AsyncPipe (Maybe Buffer) (Maybe Buffer) m Unit
brotliDecompress = fromZlib Zlib.createBrotliDecompress brotliDecompress = fromZlib Zlib.createBrotliDecompress

View File

@ -150,8 +150,8 @@ spec =
cbor :: Buffer <- Pipe.Collect.toBuffer cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a $ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransform csvDecode >-/-> Pipe.Node.fromTransformEffect csvDecode
>-/-> Pipe.Node.fromTransform cborEncode >-/-> Pipe.Node.fromTransformEffect cborEncode
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
@ -163,8 +163,8 @@ spec =
cbor :: Buffer <- Pipe.Collect.toBuffer cbor :: Buffer <- Pipe.Collect.toBuffer
$ Pipe.FS.read a $ Pipe.FS.read a
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-> sync (Pipe.Node.fromTransform csvDecode) >-> sync (Pipe.Node.fromTransformEffect csvDecode)
>-> sync (Pipe.Node.fromTransform cborEncode) >-> sync (Pipe.Node.fromTransformEffect cborEncode)
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
f :: Array Foreign <- liftEffect $ cborDecodeSync cbor f :: Array Foreign <- liftEffect $ cborDecodeSync cbor
ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f ppl <- traverse (liftEither <<< lmap (error <<< show) <<< runExcept <<< readImpl) f
@ -188,7 +188,7 @@ spec =
Pipe.Collect.toMonoid Pipe.Collect.toMonoid
$ Pipe.FS.read a $ Pipe.FS.read a
>-/-> Pipe.Zlib.gzip >-/-> Pipe.Zlib.gzip
>-/-> Pipe.Node.fromTransform slowTransform >-/-> Pipe.Node.fromTransformEffect slowTransform
>-/-> Pipe.Zlib.gunzip >-/-> Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8 >-> Pipe.Buffer.toString UTF8
@ -201,7 +201,7 @@ spec =
Pipe.Collect.toMonoid Pipe.Collect.toMonoid
$ Pipe.FS.read a $ Pipe.FS.read a
>-> sync Pipe.Zlib.gzip >-> sync Pipe.Zlib.gzip
>-> sync (Pipe.Node.fromTransform slowTransform) >-> sync (Pipe.Node.fromTransformEffect slowTransform)
>-> sync Pipe.Zlib.gunzip >-> sync Pipe.Zlib.gunzip
>-> Pipe.Node.unEOS >-> Pipe.Node.unEOS
>-> Pipe.Buffer.toString UTF8 >-> Pipe.Buffer.toString UTF8
@ -210,10 +210,19 @@ spec =
around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do around tmpFile $ it "file >-> discardTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo" liftEffect $ FS.writeTextFile UTF8 p "foo"
r <- reader p r <- reader p
out :: List.List Int <- Pipe.toListM $ r >-/-> Pipe.Node.fromTransform discardTransform >-> Pipe.Node.unEOS out :: List.List Int <-
Pipe.toListM
$ r
>-/-> Pipe.Node.fromTransformEffect discardTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.Nil out `shouldEqual` List.Nil
around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do around tmpFile $ it "file >-> charsTransform" \(p :: String) -> do
liftEffect $ FS.writeTextFile UTF8 p "foo bar" liftEffect $ FS.writeTextFile UTF8 p "foo bar"
r <- reader p r <- reader p
out :: List.List String <- Pipe.toListM $ r >-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8) >-/-> Pipe.Node.fromTransform charsTransform >-> Pipe.Node.unEOS out :: List.List String <-
Pipe.toListM $
r
>-> Pipe.Node.inEOS (Pipe.Buffer.toString UTF8)
>-/-> Pipe.Node.fromTransformEffect charsTransform
>-> Pipe.Node.unEOS
out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ] out `shouldEqual` List.fromFoldable [ "f", "o", "o", " ", "b", "a", "r" ]