From de22f44f86ec470df73d7b6f11bc1dec64a67bbe Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Tue, 14 May 2024 12:43:38 -0500 Subject: [PATCH] wip: explore removing delays(9) --- spago.lock | 1 + spago.yaml | 1 + src/Pipes.Node.Stream.purs | 11 ++++++----- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/spago.lock b/spago.lock index 1b8e2df..768b597 100644 --- a/spago.lock +++ b/spago.lock @@ -13,6 +13,7 @@ workspace: - lists: ">=7.0.0 <8.0.0" - maybe: ">=6.0.0 <7.0.0" - mmorph: ">=7.0.0 <8.0.0" + - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - node-fs: ">=9.1.0 <10.0.0" diff --git a/spago.yaml b/spago.yaml index 2dec2be..27c5b52 100644 --- a/spago.yaml +++ b/spago.yaml @@ -20,6 +20,7 @@ package: - lists: ">=7.0.0 <8.0.0" - maybe: ">=6.0.0 <7.0.0" - mmorph: ">=7.0.0 <8.0.0" + - newtype: ">=5.0.0 <6.0.0" - node-buffer: ">=9.0.0 <10.0.0" - node-event-emitter: ">=3.0.0 <4.0.0" - node-fs: ">=9.1.0 <10.0.0" diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index 2d5e1c8..5cc5a5e 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -8,12 +8,13 @@ import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) import Data.Maybe (Maybe(..)) +import Data.Newtype (wrap) import Data.Traversable (for_) import Data.Tuple.Nested ((/\)) +import Effect.Aff (delay) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (liftEffect) import Effect.Exception (Error) -import Node.Stream.Object (needsDrain) import Node.Stream.Object as O import Pipes (await, yield) import Pipes (for) as P @@ -63,17 +64,16 @@ fromWritable w = err <- liftEffect $ liftST $ STRef.read error for_ err throwError + needsDrain <- liftEffect $ O.needsDrain w + when needsDrain $ liftAff $ O.awaitWritableOrClosed w ma <- await case ma of Nothing -> cleanup cancel Just a -> do res <- liftEffect $ O.write w a case res of - O.WriteOk -> pure $ Loop { error, cancel } - O.WriteWouldBlock -> do - liftAff (O.awaitWritableOrClosed w) - pure $ Loop { error, cancel } O.WriteClosed -> cleanup cancel + _ -> pure $ Loop { error, cancel } in do r <- liftEffect $ O.withErrorST w @@ -111,6 +111,7 @@ fromTransform t = needsDrain <- liftEffect $ O.needsDrain t if needsDrain then do + liftAff $ delay $ wrap 0.0 liftAff $ O.awaitReadableOrClosed t yieldWhileReadable pure $ Loop {error, cancel}