diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index bf451d6..2d5e1c8 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -13,6 +13,7 @@ import Data.Tuple.Nested ((/\)) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Effect.Exception (Error) +import Node.Stream.Object (needsDrain) import Node.Stream.Object as O import Pipes (await, yield) import Pipes (for) as P @@ -108,23 +109,25 @@ fromTransform t = err <- liftEffect $ liftST $ STRef.read error for_ err throwError - yieldWhileReadable - ma <- await - case ma of - Nothing -> cleanup cancel - Just a' -> do - res <- liftEffect $ O.write t a' - case res of - O.WriteClosed -> cleanup cancel - O.WriteOk -> do - maybeYield1 - pure $ Loop { error, cancel } - O.WriteWouldBlock -> do - queueLen <- liftEffect $ O.readableLength t - when (queueLen == 0) $ liftAff $ O.awaitReadableOrClosed t - yieldWhileReadable - liftAff $ O.awaitWritableOrClosed t - pure $ Loop { error, cancel } + needsDrain <- liftEffect $ O.needsDrain t + if needsDrain then do + liftAff $ O.awaitReadableOrClosed t + yieldWhileReadable + pure $ Loop {error, cancel} + else do + ma <- await + case ma of + Nothing -> cleanup cancel + Just a' -> do + res <- liftEffect $ O.write t a' + case res of + O.WriteClosed -> cleanup cancel + O.WriteOk -> do + maybeYield1 + pure $ Loop { error, cancel } + O.WriteWouldBlock -> do + yieldWhileReadable + pure $ Loop { error, cancel } in do r <- liftEffect $ O.withErrorST t