diff --git a/src/Pipes.Async.purs b/src/Pipes.Async.purs index 14ee46a..f589254 100644 --- a/src/Pipes.Async.purs +++ b/src/Pipes.Async.purs @@ -8,7 +8,7 @@ import Control.Monad.Except (ExceptT, runExceptT) import Control.Monad.Fork.Class (class MonadBracket, class MonadFork, fork) import Control.Monad.Maybe.Trans (runMaybeT) import Control.Monad.Morph (hoist) -import Control.Monad.Rec.Class (class MonadRec, Step(..), forever, tailRecM) +import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM) import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref (STRef) import Control.Monad.ST.Ref as ST.Ref @@ -116,7 +116,7 @@ instance Monad m => Profunctor (AsyncPipe x m) where -- | Wraps all fields of an `AsyncPipe` with logging to debug -- | behavior and timing. -debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m a b -> AsyncPipe x m a b +debug :: forall x a b m. MonadAff m => String -> AsyncPipe x m (Maybe a) (Maybe b) -> AsyncPipe x m (Maybe a) (Maybe b) debug c (AsyncPipe init write awaitWrite read awaitRead) = let logL m = liftEffect $ log $ "[" <> c <> "] " <> m @@ -144,7 +144,7 @@ debug c (AsyncPipe init write awaitWrite read awaitRead) = read' x = do logR "read >" elapsed /\ r <- time $ read x - logR $ "< read " <> show (r $> unit) <> " (" <> show (unwrap elapsed) <> "ms)" + logR $ "< read " <> show (map (const unit) <$> r) <> " (" <> show (unwrap elapsed) <> "ms)" pure r awaitWrite' x = do @@ -284,12 +284,12 @@ pipeAsync prod (AsyncPipe init write awaitWrite read awaitRead) = x <- lift init _thread <- spawn $ void $ runMaybeT $ Collect.foreach (rx x) (hoist lift prod) - forever do + flip tailRecM unit $ const do getThreadError >>= traverse_ throwError rb <- lift $ read x case rb of - ReadOk (Just b) -> yield $ Just b - ReadOk Nothing -> killThread *> yield Nothing - ReadWouldBlock -> void $ lift (awaitRead x) + ReadOk (Just b) -> yield (Just b) $> Loop unit + ReadOk Nothing -> killThread *> yield Nothing $> Done unit + ReadWouldBlock -> void (lift (awaitRead x)) $> Loop unit infixl 7 pipeAsync as >-/-> diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 5a2d4fc..3bcb662 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -147,7 +147,6 @@ fromTransform t = let s = transformStream x readEnded <- liftEffect $ O.isReadableEnded s if readEnded then do - transformCleanup x pure $ AsyncPipe.ReadOk Nothing else maybe AsyncPipe.ReadWouldBlock (AsyncPipe.ReadOk <<< Just) <$> maybeReadResult <$> liftEffect (O.read s)