From d76f55e267466c6a5c72e463c99bf1667d46e7e1 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 21 Jun 2024 13:21:19 -0500 Subject: [PATCH] fix: introduced transform bug --- src/Node.Stream.Object.purs | 17 ++++++++++++----- src/Pipes.Node.Stream.purs | 3 ++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/Node.Stream.Object.purs b/src/Node.Stream.Object.purs index 3b73383..774467a 100644 --- a/src/Node.Stream.Object.purs +++ b/src/Node.Stream.Object.purs @@ -172,9 +172,10 @@ unsafeFromStringWritable :: forall r. Stream.Writable r -> Writable String unsafeFromStringWritable = unsafeCoerce awaitReadableOrClosed :: forall s a. Read s a => s -> Aff Unit -awaitReadableOrClosed s = do +awaitReadableOrClosed s = Aff.supervise do fiber <- - Aff.forkAff $ parOneOf + Aff.forkAff + $ parOneOf [ onceAff0 readableH s $> Right unit , onceAff0 closeH s $> Right unit , Left <$> onceAff1 errorH s @@ -189,14 +190,20 @@ awaitReadableOrClosed s = do Aff.killFiber (error "") fiber awaitFinished :: forall s a. Write s a => s -> Aff Unit -awaitFinished s = do +awaitFinished s = Aff.supervise do fiber <- Aff.forkAff $ onceAff0 finishH s finished <- liftEffect $ isWritableFinished s if not finished then Aff.joinFiber fiber else Aff.killFiber (error "") fiber awaitWritableOrClosed :: forall s a. Write s a => s -> Aff Unit -awaitWritableOrClosed s = do - fiber <- Aff.forkAff $ parOneOf [ onceAff0 drainH s $> Right unit, onceAff0 closeH s $> Right unit, Left <$> onceAff1 errorH s ] +awaitWritableOrClosed s = Aff.supervise do + fiber <- + Aff.forkAff + $ parOneOf + [ onceAff0 drainH s $> Right unit + , onceAff0 closeH s $> Right unit + , Left <$> onceAff1 errorH s + ] closed <- liftEffect $ isClosed s writeEnded <- liftEffect $ isWritableEnded s writable <- liftEffect $ isWritable s diff --git a/src/Pipes.Node.Stream.purs b/src/Pipes.Node.Stream.purs index eb16461..f75aee3 100644 --- a/src/Pipes.Node.Stream.purs +++ b/src/Pipes.Node.Stream.purs @@ -7,6 +7,7 @@ import Control.Monad.Rec.Class (class MonadRec, Step(..), tailRecM, whileJust) import Control.Monad.ST.Class (liftST) import Control.Monad.ST.Ref as STRef import Control.Monad.Trans.Class (lift) +import Control.Parallel (parOneOf) import Data.Maybe (Maybe(..), maybe) import Data.Traversable (for_, traverse, traverse_) import Data.Tuple.Nested ((/\)) @@ -118,7 +119,7 @@ fromTransform t = do ended <- liftEffect $ O.isWritableEnded t if needsDrain then do yieldWhileReadable - liftAff $ O.awaitWritableOrClosed t + liftAff $ parOneOf [O.awaitWritableOrClosed t, O.awaitReadableOrClosed t] pure $ Loop unit else if ended then cleanup $> Done unit