diff --git a/src/Pipes.Collect.purs b/src/Pipes.Collect.purs index 06f9dbf..2dd1852 100644 --- a/src/Pipes.Collect.purs +++ b/src/Pipes.Collect.purs @@ -27,8 +27,12 @@ traverse :: forall a b m. MonadRec m => (b -> a -> m b) -> b -> Producer a m Uni traverse f b0 p0 = flip tailRecM (p0 /\ b0) \(p /\ b) -> case p of - Respond a m -> Loop <$> (m unit /\ _) <$> f b a - M m -> Loop <$> (_ /\ b) <$> m + Respond a m -> do + b' <- f b a + pure $ Loop $ m unit /\ b' + M m -> do + n <- m + pure $ Loop $ (n /\ b) Request _ _ -> pure $ Done b Pure _ -> pure $ Done b diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 2039fa8..5413224 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -1,6 +1,6 @@ module Pipes.Node.Stream where -import Prelude +import Prelude hiding (join) import Control.Monad.Error.Class (class MonadThrow, throwError) import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) @@ -90,15 +90,20 @@ fromTransform t = liftEffect $ removeErrorListener fromReadable t pure $ Done unit - yieldFromReadableHalf = - flip tailRecM unit $ const do - res <- liftEffect (O.read t) + + yieldWhileReadable = do + flip tailRecM unit \_ -> do + res <- liftEffect $ O.read t case res of - O.ReadJust a -> do - yield $ Just a - pure $ Loop unit - O.ReadWouldBlock -> pure $ Done unit - O.ReadClosed -> yield Nothing $> Done unit + O.ReadJust a -> yield (Just a) $> Loop unit + _ -> pure $ Done unit + + maybeYield1 = do + res <- liftEffect $ O.read t + case res of + O.ReadJust a -> yield $ Just a + _ -> pure unit + go { error, cancel } = do err <- liftEffect $ liftST $ STRef.read error for_ err throwError @@ -108,12 +113,13 @@ fromTransform t = Nothing -> cleanup cancel Just a' -> do res <- liftEffect $ O.write t a' - yieldFromReadableHalf case res of O.WriteClosed -> cleanup cancel - O.WriteOk -> pure $ Loop { error, cancel } + O.WriteOk -> do + maybeYield1 + pure $ Loop { error, cancel } O.WriteWouldBlock -> do - yieldFromReadableHalf + yieldWhileReadable liftAff $ O.awaitWritableOrClosed t pure $ Loop { error, cancel } in