From 3c8e497fa23c1b8c13c04d8ca65c74336e62c282 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 10 May 2024 18:02:04 -0500 Subject: [PATCH] fix: explicit tailRecM --- src/Control.Monad.Cleanup.Class.purs | 24 --------------------- src/Pipes.Node.Stream.purs | 31 ++++++++++++++++++---------- 2 files changed, 20 insertions(+), 35 deletions(-) delete mode 100644 src/Control.Monad.Cleanup.Class.purs diff --git a/src/Control.Monad.Cleanup.Class.purs b/src/Control.Monad.Cleanup.Class.purs deleted file mode 100644 index 683a1fc..0000000 --- a/src/Control.Monad.Cleanup.Class.purs +++ /dev/null @@ -1,24 +0,0 @@ -module Control.Monad.Cleanup where - -import Prelude - -import Control.Monad.Error.Class (class MonadError, liftEither, try) -import Control.Monad.State (StateT, modify_, runStateT) -import Data.Tuple.Nested ((/\)) - -type CleanupT m = StateT (m Unit) m - -finally :: forall m. Monad m => (m Unit) -> CleanupT m Unit -finally m = modify_ (_ *> m) - -runCleanup :: forall m a. Monad m => CleanupT m a -> m a -runCleanup m = do - a /\ final <- runStateT m (pure unit) - final - pure a - -runCleanupE :: forall e m a. MonadError e m => CleanupT m a -> m a -runCleanupE m = do - ea /\ final <- runStateT (try m) (pure unit) - final - liftEither ea diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 5712e18..d07dbc9 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -3,7 +3,7 @@ module Pipes.Node.Stream where import Prelude import Control.Monad.Error.Class (throwError) -import Control.Monad.Rec.Class (whileJust) +import Control.Monad.Rec.Class (Step(..), tailRecM, whileJust) import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) @@ -28,6 +28,8 @@ fromReadable r = let cleanup rmErrorListener = do liftEffect rmErrorListener + pure $ Done unit + go {error, cancel} = do liftAff $ delay $ wrap 0.0 err <- liftEffect $ liftST $ STRef.read error @@ -35,12 +37,12 @@ fromReadable r = res <- liftEffect $ O.read r case res of - O.ReadJust a -> yield (Just a) *> go {error, cancel} - O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) *> go {error, cancel} + O.ReadJust a -> yield (Just a) $> Loop {error, cancel} + O.ReadWouldBlock -> lift (O.awaitReadableOrClosed r) $> Loop {error, cancel} O.ReadClosed -> yield Nothing *> cleanup cancel in do e <- liftEffect $ O.withErrorST r - go e + tailRecM go e -- | Convert a `Writable` stream to a `Pipe`. -- | @@ -52,6 +54,8 @@ fromWritable w = cleanup rmErrorListener = do liftEffect rmErrorListener liftEffect $ O.end w + pure $ Done unit + go {error, cancel} = do liftAff $ delay $ wrap 0.0 err <- liftEffect $ liftST $ STRef.read error @@ -63,12 +67,14 @@ fromWritable w = Just a -> do res <- liftEffect $ O.write w a case res of - O.WriteOk -> go {error, cancel} - O.WriteWouldBlock -> liftAff (O.awaitWritableOrClosed w) *> go {error, cancel} - O.WriteClosed -> pure unit + O.WriteOk -> pure $ Loop {error, cancel} + O.WriteWouldBlock -> do + liftAff (O.awaitWritableOrClosed w) + pure $ Loop {error, cancel} + O.WriteClosed -> cleanup cancel in do r <- liftEffect $ O.withErrorST w - go r + tailRecM go r -- | Convert a `Transform` stream to a `Pipe`. -- | @@ -81,6 +87,7 @@ fromTransform t = liftEffect $ O.end t liftEffect $ removeErrorListener fromReadable t + pure $ Done unit yieldFromReadableHalf = do res <- liftEffect (O.read t) case res of @@ -99,12 +106,14 @@ fromTransform t = res <- liftEffect $ O.write t a' yieldFromReadableHalf case res of - O.WriteOk -> go {error, cancel} - O.WriteWouldBlock -> lift (O.awaitWritableOrClosed t) *> go {error, cancel} O.WriteClosed -> cleanup cancel + O.WriteOk -> pure $ Loop {error, cancel} + O.WriteWouldBlock -> do + lift (O.awaitWritableOrClosed t) + pure $ Loop {error, cancel} in do r <- liftEffect $ O.withErrorST t - go r + tailRecM go r -- | Given a `Producer` of values, wrap them in `Just`. -- |