fix: explicit tailRecM
This commit is contained in:
parent
7a18a7182c
commit
3c8e497fa2
@ -1,24 +0,0 @@
|
|||||||
module Control.Monad.Cleanup where
|
|
||||||
|
|
||||||
import Prelude
|
|
||||||
|
|
||||||
import Control.Monad.Error.Class (class MonadError, liftEither, try)
|
|
||||||
import Control.Monad.State (StateT, modify_, runStateT)
|
|
||||||
import Data.Tuple.Nested ((/\))
|
|
||||||
|
|
||||||
type CleanupT m = StateT (m Unit) m
|
|
||||||
|
|
||||||
finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit
|
|
||||||
finally m = modify_ (_ *> m)
|
|
||||||
|
|
||||||
runCleanup :: forall m a. Monad m => CleanupT m a -> m a
|
|
||||||
runCleanup m = do
|
|
||||||
a /\ final <- runStateT m (pure unit)
|
|
||||||
final
|
|
||||||
pure a
|
|
||||||
|
|
||||||
runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a
|
|
||||||
runCleanupE m = do
|
|
||||||
ea /\ final <- runStateT (try m) (pure unit)
|
|
||||||
final
|
|
||||||
liftEither ea
|
|
@ -3,7 +3,7 @@ module Pipes.Node.Stream where
|
|||||||
import Prelude
|
import Prelude
|
||||||
|
|
||||||
import Control.Monad.Error.Class (throwError)
|
import Control.Monad.Error.Class (throwError)
|
||||||
import Control.Monad.Rec.Class (whileJust)
|
import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust)
|
||||||
import Control.Monad.ST.Class (liftST)
|
import Control.Monad.ST.Class (liftST)
|
||||||
import Control.Monad.ST.Ref as STRef
|
import Control.Monad.ST.Ref as STRef
|
||||||
import Control.Monad.Trans.Class (lift)
|
import Control.Monad.Trans.Class (lift)
|
||||||
@ -28,6 +28,8 @@ fromReadable r =
|
|||||||
let
|
let
|
||||||
cleanup rmErrorListener = do
|
cleanup rmErrorListener = do
|
||||||
liftEffect rmErrorListener
|
liftEffect rmErrorListener
|
||||||
|
pure $ Done unit
|
||||||
|
|
||||||
go {error, cancel} = do
|
go {error, cancel} = do
|
||||||
liftAff $ delay $ wrap 0.0
|
liftAff $ delay $ wrap 0.0
|
||||||
err <- liftEffect $ liftST $ STRef.read error
|
err <- liftEffect $ liftST $ STRef.read error
|
||||||
@ -35,12 +37,12 @@ fromReadable r =
|
|||||||
|
|
||||||
res <- liftEffect $ O.read r
|
res <- liftEffect $ O.read r
|
||||||
case res of
|
case res of
|
||||||
O.ReadJust a -> yield (Just a) *> go {error, cancel}
|
O.ReadJust a -> yield (Just a) $> Loop {error, cancel}
|
||||||
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel}
|
O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel}
|
||||||
O.ReadClosed -> yield Nothing *> cleanup cancel
|
O.ReadClosed -> yield Nothing *> cleanup cancel
|
||||||
in do
|
in do
|
||||||
e <- liftEffect $ O.withErrorST r
|
e <- liftEffect $ O.withErrorST r
|
||||||
go e
|
tailRecM go e
|
||||||
|
|
||||||
-- | Convert a `Writable` stream to a `Pipe`.
|
-- | Convert a `Writable` stream to a `Pipe`.
|
||||||
-- |
|
-- |
|
||||||
@ -52,6 +54,8 @@ fromWritable w =
|
|||||||
cleanup rmErrorListener = do
|
cleanup rmErrorListener = do
|
||||||
liftEffect rmErrorListener
|
liftEffect rmErrorListener
|
||||||
liftEffect $ O.end w
|
liftEffect $ O.end w
|
||||||
|
pure $ Done unit
|
||||||
|
|
||||||
go {error, cancel} = do
|
go {error, cancel} = do
|
||||||
liftAff $ delay $ wrap 0.0
|
liftAff $ delay $ wrap 0.0
|
||||||
err <- liftEffect $ liftST $ STRef.read error
|
err <- liftEffect $ liftST $ STRef.read error
|
||||||
@ -63,12 +67,14 @@ fromWritable w =
|
|||||||
Just a -> do
|
Just a -> do
|
||||||
res <- liftEffect $ O.write w a
|
res <- liftEffect $ O.write w a
|
||||||
case res of
|
case res of
|
||||||
O.WriteOk -> go {error, cancel}
|
O.WriteOk -> pure $ Loop {error, cancel}
|
||||||
O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel}
|
O.WriteWouldBlock -> do
|
||||||
O.WriteClosed -> pure unit
|
liftAff (O.awaitWritableOrClosed w)
|
||||||
|
pure $ Loop {error, cancel}
|
||||||
|
O.WriteClosed -> cleanup cancel
|
||||||
in do
|
in do
|
||||||
r <- liftEffect $ O.withErrorST w
|
r <- liftEffect $ O.withErrorST w
|
||||||
go r
|
tailRecM go r
|
||||||
|
|
||||||
-- | Convert a `Transform` stream to a `Pipe`.
|
-- | Convert a `Transform` stream to a `Pipe`.
|
||||||
-- |
|
-- |
|
||||||
@ -81,6 +87,7 @@ fromTransform t =
|
|||||||
liftEffect $ O.end t
|
liftEffect $ O.end t
|
||||||
liftEffect $ removeErrorListener
|
liftEffect $ removeErrorListener
|
||||||
fromReadable t
|
fromReadable t
|
||||||
|
pure $ Done unit
|
||||||
yieldFromReadableHalf = do
|
yieldFromReadableHalf = do
|
||||||
res <- liftEffect (O.read t)
|
res <- liftEffect (O.read t)
|
||||||
case res of
|
case res of
|
||||||
@ -99,12 +106,14 @@ fromTransform t =
|
|||||||
res <- liftEffect $ O.write t a'
|
res <- liftEffect $ O.write t a'
|
||||||
yieldFromReadableHalf
|
yieldFromReadableHalf
|
||||||
case res of
|
case res of
|
||||||
O.WriteOk -> go {error, cancel}
|
|
||||||
O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel}
|
|
||||||
O.WriteClosed -> cleanup cancel
|
O.WriteClosed -> cleanup cancel
|
||||||
|
O.WriteOk -> pure $ Loop {error, cancel}
|
||||||
|
O.WriteWouldBlock -> do
|
||||||
|
lift (O.awaitWritableOrClosed t)
|
||||||
|
pure $ Loop {error, cancel}
|
||||||
in do
|
in do
|
||||||
r <- liftEffect $ O.withErrorST t
|
r <- liftEffect $ O.withErrorST t
|
||||||
go r
|
tailRecM go r
|
||||||
|
|
||||||
-- | Given a `Producer` of values, wrap them in `Just`.
|
-- | Given a `Producer` of values, wrap them in `Just`.
|
||||||
-- |
|
-- |
|
||||||
|
Loading…
Reference in New Issue
Block a user